Compare commits
No commits in common. "978d94024e539b2b9bcbe80b36fb3307e4b9d6f7" and "41f319b9e200063277ea6b6d7465eafe83d8e030" have entirely different histories.
978d94024e
...
41f319b9e2
10 changed files with 1354 additions and 1441 deletions
1679
poetry.lock
generated
1679
poetry.lock
generated
File diff suppressed because it is too large
Load diff
|
@ -44,8 +44,6 @@ bytetracker = { git = "https://github.com/rubenvandeven/bytetrack-pip" }
|
||||||
jsonlines = "^4.0.0"
|
jsonlines = "^4.0.0"
|
||||||
tensorboardx = "^2.6.2.2"
|
tensorboardx = "^2.6.2.2"
|
||||||
shapely = "^1"
|
shapely = "^1"
|
||||||
baumer-neoapi = {path = "../../Downloads/Baumer_neoAPI_1.4.1_lin_x86_64_python/wheel/baumer_neoapi-1.4.1-cp34.cp35.cp36.cp37.cp38.cp39.cp310.cp311.cp312-none-linux_x86_64.whl"}
|
|
||||||
qrcode = "^8.0"
|
|
||||||
|
|
||||||
[build-system]
|
[build-system]
|
||||||
requires = ["poetry-core"]
|
requires = ["poetry-core"]
|
||||||
|
|
|
@ -170,6 +170,9 @@ class AnimationRenderer:
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
self.init_shapes()
|
||||||
|
|
||||||
self.init_labels()
|
self.init_labels()
|
||||||
|
|
||||||
|
|
||||||
|
@ -198,6 +201,52 @@ class AnimationRenderer:
|
||||||
# return process
|
# return process
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def init_shapes(self):
|
||||||
|
'''
|
||||||
|
Due to error when running headless, we need to configure options before extending the shapes class
|
||||||
|
'''
|
||||||
|
class GradientLine(shapes.Line):
|
||||||
|
def __init__(self, x, y, x2, y2, width=1, color1=[255,255,255], color2=[255,255,255], batch=None, group=None):
|
||||||
|
# print('colors!', colors)
|
||||||
|
# assert len(colors) == 6
|
||||||
|
|
||||||
|
r, g, b, *a = color1
|
||||||
|
self._rgba1 = (r, g, b, a[0] if a else 255)
|
||||||
|
r, g, b, *a = color2
|
||||||
|
self._rgba2 = (r, g, b, a[0] if a else 255)
|
||||||
|
|
||||||
|
# print('rgba', self._rgba)
|
||||||
|
|
||||||
|
super().__init__(x, y, x2, y2, width, color1, batch=None, group=None)
|
||||||
|
# <pyglet.graphics.vertexdomain.VertexList
|
||||||
|
# pyglet.graphics.vertexdomain
|
||||||
|
# print(self._vertex_list)
|
||||||
|
|
||||||
|
def _create_vertex_list(self):
|
||||||
|
'''
|
||||||
|
copy of super()._create_vertex_list but with additional colors'''
|
||||||
|
self._vertex_list = self._group.program.vertex_list(
|
||||||
|
6, self._draw_mode, self._batch, self._group,
|
||||||
|
position=('f', self._get_vertices()),
|
||||||
|
colors=('Bn', self._rgba1+ self._rgba2 + self._rgba2 + self._rgba1 + self._rgba2 +self._rgba1 ),
|
||||||
|
translation=('f', (self._x, self._y) * self._num_verts))
|
||||||
|
|
||||||
|
def _update_colors(self):
|
||||||
|
self._vertex_list.colors[:] = self._rgba1+ self._rgba2 + self._rgba2 + self._rgba1 + self._rgba2 +self._rgba1
|
||||||
|
|
||||||
|
def color1(self, color):
|
||||||
|
r, g, b, *a = color
|
||||||
|
self._rgba1 = (r, g, b, a[0] if a else 255)
|
||||||
|
self._update_colors()
|
||||||
|
|
||||||
|
def color2(self, color):
|
||||||
|
r, g, b, *a = color
|
||||||
|
self._rgba2 = (r, g, b, a[0] if a else 255)
|
||||||
|
self._update_colors()
|
||||||
|
|
||||||
|
self.gradientLine = GradientLine
|
||||||
|
|
||||||
def init_labels(self):
|
def init_labels(self):
|
||||||
base_color = COLOR_PRIMARY
|
base_color = COLOR_PRIMARY
|
||||||
color_predictor = (255,255,0, 255)
|
color_predictor = (255,255,0, 255)
|
||||||
|
|
404
trap/base.py
404
trap/base.py
|
@ -1,404 +0,0 @@
|
||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
from ast import List
|
|
||||||
from enum import IntFlag
|
|
||||||
from itertools import cycle
|
|
||||||
import json
|
|
||||||
import logging
|
|
||||||
from pathlib import Path
|
|
||||||
import time
|
|
||||||
from typing import Iterable, Optional
|
|
||||||
import cv2
|
|
||||||
from dataclasses import dataclass, field
|
|
||||||
|
|
||||||
import numpy as np
|
|
||||||
from deep_sort_realtime.deep_sort.track import Track as DeepsortTrack
|
|
||||||
from deep_sort_realtime.deep_sort.track import TrackState as DeepsortTrackState
|
|
||||||
from bytetracker.byte_tracker import STrack as ByteTrackTrack
|
|
||||||
from bytetracker.basetrack import TrackState as ByteTrackTrackState
|
|
||||||
import pandas as pd
|
|
||||||
|
|
||||||
from trap.utils import get_bins, inv_lerp, lerp
|
|
||||||
from trajectron.environment import Environment, Node, Scene
|
|
||||||
from urllib.parse import urlparse
|
|
||||||
|
|
||||||
logger = logging.getLogger('trap.base')
|
|
||||||
|
|
||||||
class UrlOrPath():
|
|
||||||
def __init__(self, string):
|
|
||||||
self.url = urlparse(str(string))
|
|
||||||
|
|
||||||
def __str__(self) -> str:
|
|
||||||
return self.url.geturl()
|
|
||||||
|
|
||||||
def is_url(self) -> bool:
|
|
||||||
return len(self.url.netloc) > 0
|
|
||||||
|
|
||||||
def path(self) -> Path:
|
|
||||||
if self.is_url():
|
|
||||||
return Path(self.url.path)
|
|
||||||
return Path(self.url.geturl()) # can include scheme, such as C:/
|
|
||||||
|
|
||||||
class Space(IntFlag):
|
|
||||||
Image = 1 # As detected in the image
|
|
||||||
Undistorted = 2 # After applying lense undistortiion
|
|
||||||
World = 4 # After lens undistort and homography
|
|
||||||
Render = 8 # View space of renderer
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class Position:
|
|
||||||
x: float
|
|
||||||
y: float
|
|
||||||
conf: float
|
|
||||||
state: DetectionState
|
|
||||||
frame_nr: int
|
|
||||||
det_class: str
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class DetectionState(IntFlag):
|
|
||||||
Tentative = 1 # state before n_init (see DeepsortTrack)
|
|
||||||
Confirmed = 2 # after tentative
|
|
||||||
Lost = 4 # lost when DeepsortTrack.time_since_update > 0 but not Deleted
|
|
||||||
Interpolated = 8 # A position estimated through interpolation of adjecent detections
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def from_deepsort_track(cls, track: DeepsortTrack):
|
|
||||||
if track.state == DeepsortTrackState.Tentative:
|
|
||||||
return cls.Tentative
|
|
||||||
if track.state == DeepsortTrackState.Confirmed:
|
|
||||||
if track.time_since_update > 0:
|
|
||||||
return cls.Lost
|
|
||||||
return cls.Confirmed
|
|
||||||
raise RuntimeError("Should not run into Deleted entries here")
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def from_bytetrack_track(cls, track: ByteTrackTrack):
|
|
||||||
if track.state == ByteTrackTrackState.New:
|
|
||||||
return cls.Tentative
|
|
||||||
if track.state == ByteTrackTrackState.Lost:
|
|
||||||
return cls.Lost
|
|
||||||
# if track.time_since_update > 0:
|
|
||||||
if track.state == ByteTrackTrackState.Tracked:
|
|
||||||
return cls.Confirmed
|
|
||||||
raise RuntimeError("Should not run into Deleted entries here")
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def H_from_path(path: Path):
|
|
||||||
if path.suffix == '.json':
|
|
||||||
with path.open('r') as fp:
|
|
||||||
H = np.array(json.load(fp))
|
|
||||||
else:
|
|
||||||
H = np.loadtxt(path, delimiter=',')
|
|
||||||
return H
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class Camera:
|
|
||||||
mtx: cv2.Mat
|
|
||||||
dist: cv2.Mat
|
|
||||||
w: float
|
|
||||||
h: float
|
|
||||||
H: cv2.Mat # homography
|
|
||||||
|
|
||||||
newcameramtx: cv2.Mat = field(init=False)
|
|
||||||
roi: cv2.typing.Rect = field(init=False)
|
|
||||||
fps: float
|
|
||||||
|
|
||||||
def __post_init__(self):
|
|
||||||
self.newcameramtx, self.roi = cv2.getOptimalNewCameraMatrix(self.mtx, self.dist, (self.w,self.h), 1, (self.w,self.h))
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def from_calibfile(cls, calibration_path, H, fps):
|
|
||||||
with calibration_path.open('r') as fp:
|
|
||||||
data = json.load(fp)
|
|
||||||
# print(data)
|
|
||||||
# print(data['camera_matrix'])
|
|
||||||
# camera = {
|
|
||||||
# 'camera_matrix': np.array(data['camera_matrix']),
|
|
||||||
# 'dist_coeff': np.array(data['dist_coeff']),
|
|
||||||
# }
|
|
||||||
return cls(
|
|
||||||
np.array(data['camera_matrix']),
|
|
||||||
np.array(data['dist_coeff']),
|
|
||||||
data['dim']['width'],
|
|
||||||
data['dim']['height'],
|
|
||||||
H, fps)
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def from_paths(cls, calibration_path, h_path, fps):
|
|
||||||
H = H_from_path(h_path)
|
|
||||||
return cls.from_calibfile(calibration_path, H, fps)
|
|
||||||
|
|
||||||
# def __init__(self, mtx, dist, w, h, H):
|
|
||||||
# self.mtx = mtx
|
|
||||||
# self.dist = dist
|
|
||||||
# self.w = w
|
|
||||||
# self.h = h
|
|
||||||
# self.newcameramtx, self.roi = cv2.getOptimalNewCameraMatrix(mtx, dist, (w,h), 1, (w,h))
|
|
||||||
# self.H = H # homography
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class Detection:
|
|
||||||
track_id: str # deepsort track id association
|
|
||||||
l: int # left - image space
|
|
||||||
t: int # top - image space
|
|
||||||
w: int # width - image space
|
|
||||||
h: int # height - image space
|
|
||||||
conf: float # object detector probablity
|
|
||||||
state: DetectionState
|
|
||||||
frame_nr: int
|
|
||||||
det_class: str
|
|
||||||
|
|
||||||
def get_foot_coords(self) -> list[float, float]:
|
|
||||||
return [self.l + 0.5 * self.w, self.t+self.h]
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def from_deepsort(cls, dstrack: DeepsortTrack, frame_nr: int):
|
|
||||||
return cls(dstrack.track_id, *dstrack.to_ltwh(), dstrack.det_conf, DetectionState.from_deepsort_track(dstrack), frame_nr, dstrack.det_class)
|
|
||||||
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def from_bytetrack(cls, bstrack: ByteTrackTrack, frame_nr: int):
|
|
||||||
return cls(bstrack.track_id, *bstrack.tlwh, bstrack.score, DetectionState.from_bytetrack_track(bstrack), frame_nr, bstrack.cls)
|
|
||||||
|
|
||||||
def get_scaled(self, scale: float = 1):
|
|
||||||
if scale == 1:
|
|
||||||
return self
|
|
||||||
|
|
||||||
return Detection(
|
|
||||||
self.track_id,
|
|
||||||
self.l*scale,
|
|
||||||
self.t*scale,
|
|
||||||
self.w*scale,
|
|
||||||
self.h*scale,
|
|
||||||
self.conf,
|
|
||||||
self.state,
|
|
||||||
self.frame_nr,
|
|
||||||
self.det_class)
|
|
||||||
|
|
||||||
def to_ltwh(self):
|
|
||||||
return (int(self.l), int(self.t), int(self.w), int(self.h))
|
|
||||||
|
|
||||||
def to_ltrb(self):
|
|
||||||
return (int(self.l), int(self.t), int(self.l+self.w), int(self.t+self.h))
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class Track:
|
|
||||||
"""A bit of an haphazardous wrapper around the 'real' tracker to provide
|
|
||||||
a history, with which the predictor can work, as we then can deduce velocity
|
|
||||||
and acceleration.
|
|
||||||
"""
|
|
||||||
track_id: str = None
|
|
||||||
history: List[Detection] = field(default_factory=list)
|
|
||||||
predictor_history: Optional[list] = None # in image space
|
|
||||||
predictions: Optional[list] = None
|
|
||||||
fps: int = 12 # TODO)) convert this to camera? That way, incorporates H and dist, alternatively, each track is as a whole attached to a space
|
|
||||||
source: Optional[int] = None # to keep track of processed tracks
|
|
||||||
|
|
||||||
def get_projected_history(self, H: Optional[cv2.Mat] = None, camera: Optional[Camera]= None) -> np.array:
|
|
||||||
foot_coordinates = [d.get_foot_coords() for d in self.history]
|
|
||||||
# TODO)) Undistort points before perspective transform
|
|
||||||
if len(foot_coordinates):
|
|
||||||
if camera:
|
|
||||||
coords = cv2.undistortPoints(np.array([foot_coordinates]).astype('float32'), camera.mtx, camera.dist, None, camera.newcameramtx)
|
|
||||||
coords = cv2.perspectiveTransform(np.array(coords),camera.H)
|
|
||||||
return coords.reshape((coords.shape[0],2))
|
|
||||||
else:
|
|
||||||
coords = cv2.perspectiveTransform(np.array([foot_coordinates]),H)
|
|
||||||
return coords[0]
|
|
||||||
return np.array([])
|
|
||||||
|
|
||||||
def get_projected_history_as_dict(self, H, camera: Optional[Camera]= None) -> dict:
|
|
||||||
coords = self.get_projected_history(H, camera)
|
|
||||||
return [{"x":c[0], "y":c[1]} for c in coords]
|
|
||||||
|
|
||||||
def get_with_interpolated_history(self) -> Track:
|
|
||||||
# new_history = [Detection(d.track_id, l, t, w, h, d.conf, d.state, d.frame_nr, d.det_class) for l, t, w, h, d in zip(ls,ts,ws,hs, track.history)]
|
|
||||||
# new_track = Track(track.track_id, new_history, track.predictor_history, track.predictions)
|
|
||||||
new_history = []
|
|
||||||
for j in range(len(self.history)):
|
|
||||||
a = self.history[j]
|
|
||||||
new_history.append(Detection(a.track_id, a.l, a.t, a.w, a.h, a.conf, a.state, a.frame_nr, a.det_class))
|
|
||||||
|
|
||||||
if j+1 >= len(self.history):
|
|
||||||
break
|
|
||||||
|
|
||||||
b = self.history[j+1]
|
|
||||||
gap = b.frame_nr - a.frame_nr
|
|
||||||
if gap < 1:
|
|
||||||
logger.error(f"WARNING, gap between frames {a.frame_nr} -> {b.frame_nr} is negative?")
|
|
||||||
if gap > 1:
|
|
||||||
for g in range(1, gap):
|
|
||||||
l = lerp(a.l, b.l, g/gap)
|
|
||||||
t = lerp(a.t, b.t, g/gap)
|
|
||||||
w = lerp(a.w, b.w, g/gap)
|
|
||||||
h = lerp(a.h, b.h, g/gap)
|
|
||||||
conf = 0
|
|
||||||
state = DetectionState.Lost
|
|
||||||
frame_nr = a.frame_nr + g
|
|
||||||
new_history.append(Detection(a.track_id, l, t, w, h, conf, state, frame_nr, a.det_class))
|
|
||||||
|
|
||||||
return Track(
|
|
||||||
self.track_id,
|
|
||||||
new_history,
|
|
||||||
self.predictor_history,
|
|
||||||
self.predictions,
|
|
||||||
self.fps)
|
|
||||||
|
|
||||||
def is_complete(self):
|
|
||||||
diffs = [(b.frame_nr - a.frame_nr) for a,b in zip(self.history[:-1], self.history[1:])]
|
|
||||||
return any([d != 1 for d in diffs])
|
|
||||||
|
|
||||||
|
|
||||||
def get_sampled(self, step_size = 1, offset=0):
|
|
||||||
if not self.is_complete():
|
|
||||||
t = self.get_with_interpolated_history()
|
|
||||||
else:
|
|
||||||
t = self
|
|
||||||
|
|
||||||
return Track(
|
|
||||||
t.track_id,
|
|
||||||
t.history[offset::step_size],
|
|
||||||
t.predictor_history,
|
|
||||||
t.predictions,
|
|
||||||
t.fps/step_size)
|
|
||||||
|
|
||||||
def get_binned(self, bin_size, camera: Camera, bin_start=True):
|
|
||||||
"""
|
|
||||||
For an experiment: what if we predict using only concrete positions, by mapping
|
|
||||||
dx,dy to a grid. Thus prediction can be for 8 moves, or rather headings
|
|
||||||
see ~/notes/attachments example svg
|
|
||||||
"""
|
|
||||||
|
|
||||||
history = self.get_projected_history_as_dict(H=None, camera=camera)
|
|
||||||
|
|
||||||
def round_to_grid_precision(x):
|
|
||||||
factor = 1/bin_size
|
|
||||||
return round(x * factor) / factor
|
|
||||||
|
|
||||||
new_history: List[dict] = []
|
|
||||||
for i, (det0, det1) in enumerate(zip(history[:-1], history[1:])):
|
|
||||||
if i == 0:
|
|
||||||
new_history.append({
|
|
||||||
'x': round_to_grid_precision(det0['x']),
|
|
||||||
'y': round_to_grid_precision(det0['y'])
|
|
||||||
} if bin_start else det0)
|
|
||||||
continue
|
|
||||||
if abs(det1['x'] - new_history[-1]['x']) < bin_size and abs(det1['y'] - new_history[-1]['y']) < bin_size:
|
|
||||||
continue
|
|
||||||
|
|
||||||
# det1 falls outside of the box [-bin_size:+bin_size] around last detection
|
|
||||||
|
|
||||||
# 1. Interpolate exact point between det0 and det1 that this happens
|
|
||||||
if abs(det1['x'] - new_history[-1]['x']) >= bin_size:
|
|
||||||
if det1['x'] - new_history[-1]['x'] >= bin_size:
|
|
||||||
# det1 left of last
|
|
||||||
x = new_history[-1]['x'] + bin_size
|
|
||||||
f = inv_lerp(det0['x'], det1['x'], x)
|
|
||||||
elif new_history[-1]['x'] - det1['x'] >= bin_size:
|
|
||||||
# det1 left of last
|
|
||||||
x = new_history[-1]['x'] - bin_size
|
|
||||||
f = inv_lerp(det0['x'], det1['x'], x)
|
|
||||||
y = lerp(det0['y'], det1['y'], f)
|
|
||||||
if abs(det1['y'] - new_history[-1]['y']) >= bin_size:
|
|
||||||
if det1['y'] - new_history[-1]['y'] >= bin_size:
|
|
||||||
# det1 left of last
|
|
||||||
y = new_history[-1]['y'] + bin_size
|
|
||||||
f = inv_lerp(det0['y'], det1['y'], y)
|
|
||||||
elif new_history[-1]['y'] - det1['y'] >= bin_size:
|
|
||||||
# det1 left of last
|
|
||||||
y = new_history[-1]['y'] - bin_size
|
|
||||||
f = inv_lerp(det0['y'], det1['y'], y)
|
|
||||||
x = lerp(det0['x'], det1['x'], f)
|
|
||||||
|
|
||||||
|
|
||||||
# 2. Find closest point on rectangle (rectangle's four corners, or 4 midpoints)
|
|
||||||
points = get_bins(bin_size)
|
|
||||||
points = [[new_history[-1]['x']+p[0], new_history[-1]['y'] + p[1]] for p in points]
|
|
||||||
|
|
||||||
distances = [np.linalg.norm([p[0] - x, p[1]-y]) for p in points]
|
|
||||||
closest = np.argmin(distances)
|
|
||||||
|
|
||||||
point = points[closest]
|
|
||||||
|
|
||||||
new_history.append({'x': point[0], 'y':point[1]})
|
|
||||||
# todo Offsets to points:[ history for in points]
|
|
||||||
return new_history
|
|
||||||
|
|
||||||
|
|
||||||
def to_trajectron_node(self, camera: Camera, env: Environment) -> Node:
|
|
||||||
positions = self.get_projected_history(None, camera)
|
|
||||||
velocity = np.gradient(positions, 1/self.fps, axis=0)
|
|
||||||
acceleration = np.gradient(velocity, 1/self.fps, axis=0)
|
|
||||||
|
|
||||||
new_first_idx = self.history[0].frame_nr
|
|
||||||
|
|
||||||
data_columns = pd.MultiIndex.from_product([['position', 'velocity', 'acceleration'], ['x', 'y']])
|
|
||||||
|
|
||||||
|
|
||||||
# vx = derivative_of(x, scene.dt)
|
|
||||||
# vy = derivative_of(y, scene.dt)
|
|
||||||
# ax = derivative_of(vx, scene.dt)
|
|
||||||
# ay = derivative_of(vy, scene.dt)
|
|
||||||
|
|
||||||
data_dict = {
|
|
||||||
('position', 'x'): positions[:,0],
|
|
||||||
('position', 'y'): positions[:,1],
|
|
||||||
('velocity', 'x'): velocity[:,0],
|
|
||||||
('velocity', 'y'): velocity[:,1],
|
|
||||||
('acceleration', 'x'): acceleration[:,0],
|
|
||||||
('acceleration', 'y'): acceleration[:,1]
|
|
||||||
}
|
|
||||||
|
|
||||||
node_data = pd.DataFrame(data_dict, columns=data_columns)
|
|
||||||
|
|
||||||
return Node(node_type=env.NodeType.PEDESTRIAN, node_id=self.track_id, data=node_data, first_timestep=new_first_idx)
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class Frame:
|
|
||||||
index: int
|
|
||||||
img: np.array
|
|
||||||
time: float= field(default_factory=lambda: time.time())
|
|
||||||
tracks: Optional[dict[str, Track]] = None
|
|
||||||
H: Optional[np.array] = None
|
|
||||||
camera: Optional[Camera] = None
|
|
||||||
maps: Optional[List[cv2.Mat]] = None
|
|
||||||
|
|
||||||
def aslist(self) -> List[dict]:
|
|
||||||
return { t.track_id:
|
|
||||||
{
|
|
||||||
'id': t.track_id,
|
|
||||||
'history': t.get_projected_history(self.H).tolist(),
|
|
||||||
'det_conf': t.history[-1].conf,
|
|
||||||
# 'det_conf': trajectory_data[node.id]['det_conf'],
|
|
||||||
# 'bbox': trajectory_data[node.id]['bbox'],
|
|
||||||
# 'history': history.tolist(),
|
|
||||||
'predictions': t.predictions
|
|
||||||
} for t in self.tracks.values()
|
|
||||||
}
|
|
||||||
|
|
||||||
def without_img(self):
|
|
||||||
return Frame(self.index, None, self.time, self.tracks, self.H, self.camera, self.maps)
|
|
||||||
|
|
||||||
def video_src_from_config(config) -> Iterable[UrlOrPath]:
|
|
||||||
if config.video_loop:
|
|
||||||
video_srcs: Iterable[UrlOrPath] = cycle(config.video_src)
|
|
||||||
else:
|
|
||||||
video_srcs: Iterable[UrlOrPath] = config.video_src
|
|
||||||
return video_srcs
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class Trajectory:
|
|
||||||
# TODO)) Replace history and predictions in Track with Trajectory
|
|
||||||
space: Space
|
|
||||||
fps: int = 12
|
|
||||||
points: List[Detection] = field(default_factory=list)
|
|
||||||
|
|
||||||
def __iter__(self):
|
|
||||||
for d in self.points:
|
|
||||||
yield d
|
|
|
@ -353,9 +353,6 @@ render_parser.add_argument("--render-hide-stats",
|
||||||
render_parser.add_argument("--full-screen",
|
render_parser.add_argument("--full-screen",
|
||||||
help="Set Window full screen",
|
help="Set Window full screen",
|
||||||
action='store_true')
|
action='store_true')
|
||||||
render_parser.add_argument("--render-clusters",
|
|
||||||
help="renders arrowd clusters instead of individual predictions",
|
|
||||||
action='store_true')
|
|
||||||
|
|
||||||
render_parser.add_argument("--render-url",
|
render_parser.add_argument("--render-url",
|
||||||
help="""Stream renderer on given URL. Two easy approaches:
|
help="""Stream renderer on given URL. Two easy approaches:
|
||||||
|
|
|
@ -392,7 +392,7 @@ class CvRenderer:
|
||||||
if first_time is None:
|
if first_time is None:
|
||||||
first_time = frame.time
|
first_time = frame.time
|
||||||
|
|
||||||
img = decorate_frame(frame, tracker_frame, prediction_frame, first_time, self.config, self.tracks, self.predictions, self.config.render_clusters)
|
img = decorate_frame(frame, tracker_frame, prediction_frame, first_time, self.config, self.tracks, self.predictions)
|
||||||
|
|
||||||
logger.debug(f"write frame {frame.time - first_time:.3f}s")
|
logger.debug(f"write frame {frame.time - first_time:.3f}s")
|
||||||
if self.out_writer:
|
if self.out_writer:
|
||||||
|
@ -456,7 +456,7 @@ def get_animation_position(track: Track, current_frame: Frame):
|
||||||
|
|
||||||
|
|
||||||
# Deprecated
|
# Deprecated
|
||||||
def decorate_frame(frame: Frame, tracker_frame: Frame, prediction_frame: Frame, first_time: float, config: Namespace, tracks: Dict[str, Track], predictions: Dict[str, Track], as_clusters = True) -> np.array:
|
def decorate_frame(frame: Frame, tracker_frame: Frame, prediction_frame: Frame, first_time: float, config: Namespace, tracks: Dict[str, Track], predictions: Dict[str, Track]) -> np.array:
|
||||||
# TODO: replace opencv with QPainter to support alpha? https://doc.qt.io/qtforpython-5/PySide2/QtGui/QPainter.html#PySide2.QtGui.PySide2.QtGui.QPainter.drawImage
|
# TODO: replace opencv with QPainter to support alpha? https://doc.qt.io/qtforpython-5/PySide2/QtGui/QPainter.html#PySide2.QtGui.PySide2.QtGui.QPainter.drawImage
|
||||||
# or https://github.com/pygobject/pycairo?tab=readme-ov-file
|
# or https://github.com/pygobject/pycairo?tab=readme-ov-file
|
||||||
# or https://pyglet.readthedocs.io/en/latest/programming_guide/shapes.html
|
# or https://pyglet.readthedocs.io/en/latest/programming_guide/shapes.html
|
||||||
|
@ -498,10 +498,10 @@ def decorate_frame(frame: Frame, tracker_frame: Frame, prediction_frame: Frame,
|
||||||
else:
|
else:
|
||||||
for track_id, track in predictions.items():
|
for track_id, track in predictions.items():
|
||||||
inv_H = np.linalg.pinv(prediction_frame.H)
|
inv_H = np.linalg.pinv(prediction_frame.H)
|
||||||
# For debugging:
|
# draw_track(img, track, int(track_id))
|
||||||
# draw_trackjectron_history(img, track, int(track.track_id), convert_world_points_to_img_points)
|
draw_trackjectron_history(img, track, int(track.track_id), convert_world_points_to_img_points)
|
||||||
anim_position = get_animation_position(track, frame)
|
anim_position = get_animation_position(track, frame)
|
||||||
draw_track_predictions(img, track, int(track.track_id)+1, config.camera, convert_world_points_to_img_points, anim_position=anim_position, as_clusters=as_clusters)
|
draw_track_predictions(img, track, int(track.track_id)+1, config.camera, convert_world_points_to_img_points, anim_position=anim_position, as_clusters=True)
|
||||||
cv2.putText(img, f"{len(track.predictor_history) if track.predictor_history else 'none'}", to_point(track.history[0].get_foot_coords()), cv2.FONT_HERSHEY_COMPLEX, 1, (255,255,255), 1)
|
cv2.putText(img, f"{len(track.predictor_history) if track.predictor_history else 'none'}", to_point(track.history[0].get_foot_coords()), cv2.FONT_HERSHEY_COMPLEX, 1, (255,255,255), 1)
|
||||||
if prediction_frame.maps:
|
if prediction_frame.maps:
|
||||||
for i, m in enumerate(prediction_frame.maps):
|
for i, m in enumerate(prediction_frame.maps):
|
||||||
|
|
|
@ -25,10 +25,8 @@ from bytetracker.basetrack import TrackState as ByteTrackTrackState
|
||||||
from trajectron.environment import Environment, Node, Scene
|
from trajectron.environment import Environment, Node, Scene
|
||||||
from urllib.parse import urlparse
|
from urllib.parse import urlparse
|
||||||
|
|
||||||
from trap.base import *
|
|
||||||
from trap.utils import get_bins
|
from trap.utils import get_bins
|
||||||
from trap.utils import inv_lerp, lerp
|
from trap.utils import inv_lerp, lerp
|
||||||
from trap.video_sources import get_video_source
|
|
||||||
|
|
||||||
logger = logging.getLogger('trap.frame_emitter')
|
logger = logging.getLogger('trap.frame_emitter')
|
||||||
|
|
||||||
|
@ -58,8 +56,380 @@ class DataclassJSONEncoder(json.JSONEncoder):
|
||||||
return super().default(o)
|
return super().default(o)
|
||||||
|
|
||||||
|
|
||||||
|
class UrlOrPath():
|
||||||
|
def __init__(self, string):
|
||||||
|
self.url = urlparse(str(string))
|
||||||
|
|
||||||
|
def __str__(self) -> str:
|
||||||
|
return self.url.geturl()
|
||||||
|
|
||||||
|
def is_url(self) -> bool:
|
||||||
|
return len(self.url.netloc) > 0
|
||||||
|
|
||||||
|
def path(self) -> Path:
|
||||||
|
if self.is_url():
|
||||||
|
return Path(self.url.path)
|
||||||
|
return Path(self.url.geturl()) # can include scheme, such as C:/
|
||||||
|
|
||||||
|
class Space(IntFlag):
|
||||||
|
Image = 1 # As detected in the image
|
||||||
|
Undistorted = 2 # After applying lense undistortiion
|
||||||
|
World = 4 # After lens undistort and homography
|
||||||
|
Render = 8 # View space of renderer
|
||||||
|
|
||||||
|
class DetectionState(IntFlag):
|
||||||
|
Tentative = 1 # state before n_init (see DeepsortTrack)
|
||||||
|
Confirmed = 2 # after tentative
|
||||||
|
Lost = 4 # lost when DeepsortTrack.time_since_update > 0 but not Deleted
|
||||||
|
Interpolated = 8 # A position estimated through interpolation of adjecent detections
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_deepsort_track(cls, track: DeepsortTrack):
|
||||||
|
if track.state == DeepsortTrackState.Tentative:
|
||||||
|
return cls.Tentative
|
||||||
|
if track.state == DeepsortTrackState.Confirmed:
|
||||||
|
if track.time_since_update > 0:
|
||||||
|
return cls.Lost
|
||||||
|
return cls.Confirmed
|
||||||
|
raise RuntimeError("Should not run into Deleted entries here")
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_bytetrack_track(cls, track: ByteTrackTrack):
|
||||||
|
if track.state == ByteTrackTrackState.New:
|
||||||
|
return cls.Tentative
|
||||||
|
if track.state == ByteTrackTrackState.Lost:
|
||||||
|
return cls.Lost
|
||||||
|
# if track.time_since_update > 0:
|
||||||
|
if track.state == ByteTrackTrackState.Tracked:
|
||||||
|
return cls.Confirmed
|
||||||
|
raise RuntimeError("Should not run into Deleted entries here")
|
||||||
|
|
||||||
|
def H_from_path(path: Path):
|
||||||
|
if path.suffix == '.json':
|
||||||
|
with path.open('r') as fp:
|
||||||
|
H = np.array(json.load(fp))
|
||||||
|
else:
|
||||||
|
H = np.loadtxt(path, delimiter=',')
|
||||||
|
return H
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class Camera:
|
||||||
|
mtx: cv2.Mat
|
||||||
|
dist: cv2.Mat
|
||||||
|
w: float
|
||||||
|
h: float
|
||||||
|
H: cv2.Mat # homography
|
||||||
|
|
||||||
|
newcameramtx: cv2.Mat = field(init=False)
|
||||||
|
roi: cv2.typing.Rect = field(init=False)
|
||||||
|
fps: float
|
||||||
|
|
||||||
|
def __post_init__(self):
|
||||||
|
self.newcameramtx, self.roi = cv2.getOptimalNewCameraMatrix(self.mtx, self.dist, (self.w,self.h), 1, (self.w,self.h))
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_calibfile(cls, calibration_path, H, fps):
|
||||||
|
with calibration_path.open('r') as fp:
|
||||||
|
data = json.load(fp)
|
||||||
|
# print(data)
|
||||||
|
# print(data['camera_matrix'])
|
||||||
|
# camera = {
|
||||||
|
# 'camera_matrix': np.array(data['camera_matrix']),
|
||||||
|
# 'dist_coeff': np.array(data['dist_coeff']),
|
||||||
|
# }
|
||||||
|
return cls(
|
||||||
|
np.array(data['camera_matrix']),
|
||||||
|
np.array(data['dist_coeff']),
|
||||||
|
data['dim']['width'],
|
||||||
|
data['dim']['height'],
|
||||||
|
H, fps)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_paths(cls, calibration_path, h_path, fps):
|
||||||
|
H = H_from_path(h_path)
|
||||||
|
return cls.from_calibfile(calibration_path, H, fps)
|
||||||
|
|
||||||
|
# def __init__(self, mtx, dist, w, h, H):
|
||||||
|
# self.mtx = mtx
|
||||||
|
# self.dist = dist
|
||||||
|
# self.w = w
|
||||||
|
# self.h = h
|
||||||
|
# self.newcameramtx, self.roi = cv2.getOptimalNewCameraMatrix(mtx, dist, (w,h), 1, (w,h))
|
||||||
|
# self.H = H # homography
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class Position:
|
||||||
|
x: float
|
||||||
|
y: float
|
||||||
|
conf: float
|
||||||
|
state: DetectionState
|
||||||
|
frame_nr: int
|
||||||
|
det_class: str
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class Detection:
|
||||||
|
track_id: str # deepsort track id association
|
||||||
|
l: int # left - image space
|
||||||
|
t: int # top - image space
|
||||||
|
w: int # width - image space
|
||||||
|
h: int # height - image space
|
||||||
|
conf: float # object detector probablity
|
||||||
|
state: DetectionState
|
||||||
|
frame_nr: int
|
||||||
|
det_class: str
|
||||||
|
|
||||||
|
def get_foot_coords(self) -> list[float, float]:
|
||||||
|
return [self.l + 0.5 * self.w, self.t+self.h]
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_deepsort(cls, dstrack: DeepsortTrack, frame_nr: int):
|
||||||
|
return cls(dstrack.track_id, *dstrack.to_ltwh(), dstrack.det_conf, DetectionState.from_deepsort_track(dstrack), frame_nr, dstrack.det_class)
|
||||||
|
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_bytetrack(cls, bstrack: ByteTrackTrack, frame_nr: int):
|
||||||
|
return cls(bstrack.track_id, *bstrack.tlwh, bstrack.score, DetectionState.from_bytetrack_track(bstrack), frame_nr, bstrack.cls)
|
||||||
|
|
||||||
|
def get_scaled(self, scale: float = 1):
|
||||||
|
if scale == 1:
|
||||||
|
return self
|
||||||
|
|
||||||
|
return Detection(
|
||||||
|
self.track_id,
|
||||||
|
self.l*scale,
|
||||||
|
self.t*scale,
|
||||||
|
self.w*scale,
|
||||||
|
self.h*scale,
|
||||||
|
self.conf,
|
||||||
|
self.state,
|
||||||
|
self.frame_nr,
|
||||||
|
self.det_class)
|
||||||
|
|
||||||
|
def to_ltwh(self):
|
||||||
|
return (int(self.l), int(self.t), int(self.w), int(self.h))
|
||||||
|
|
||||||
|
def to_ltrb(self):
|
||||||
|
return (int(self.l), int(self.t), int(self.l+self.w), int(self.t+self.h))
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class Trajectory:
|
||||||
|
# TODO)) Replace history and predictions in Track with Trajectory
|
||||||
|
space: Space
|
||||||
|
fps: int = 12
|
||||||
|
points: List[Detection] = field(default_factory=list)
|
||||||
|
|
||||||
|
def __iter__(self):
|
||||||
|
for d in self.points:
|
||||||
|
yield d
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class Track:
|
||||||
|
"""A bit of an haphazardous wrapper around the 'real' tracker to provide
|
||||||
|
a history, with which the predictor can work, as we then can deduce velocity
|
||||||
|
and acceleration.
|
||||||
|
"""
|
||||||
|
track_id: str = None
|
||||||
|
history: List[Detection] = field(default_factory=list)
|
||||||
|
predictor_history: Optional[list] = None # in image space
|
||||||
|
predictions: Optional[list] = None
|
||||||
|
fps: int = 12 # TODO)) convert this to camera? That way, incorporates H and dist, alternatively, each track is as a whole attached to a space
|
||||||
|
source: Optional[int] = None # to keep track of processed tracks
|
||||||
|
|
||||||
|
def get_projected_history(self, H: Optional[cv2.Mat] = None, camera: Optional[Camera]= None) -> np.array:
|
||||||
|
foot_coordinates = [d.get_foot_coords() for d in self.history]
|
||||||
|
# TODO)) Undistort points before perspective transform
|
||||||
|
if len(foot_coordinates):
|
||||||
|
if camera:
|
||||||
|
coords = cv2.undistortPoints(np.array([foot_coordinates]).astype('float32'), camera.mtx, camera.dist, None, camera.newcameramtx)
|
||||||
|
coords = cv2.perspectiveTransform(np.array(coords),camera.H)
|
||||||
|
return coords.reshape((coords.shape[0],2))
|
||||||
|
else:
|
||||||
|
coords = cv2.perspectiveTransform(np.array([foot_coordinates]),H)
|
||||||
|
return coords[0]
|
||||||
|
return np.array([])
|
||||||
|
|
||||||
|
def get_projected_history_as_dict(self, H, camera: Optional[Camera]= None) -> dict:
|
||||||
|
coords = self.get_projected_history(H, camera)
|
||||||
|
return [{"x":c[0], "y":c[1]} for c in coords]
|
||||||
|
|
||||||
|
def get_with_interpolated_history(self) -> Track:
|
||||||
|
# new_history = [Detection(d.track_id, l, t, w, h, d.conf, d.state, d.frame_nr, d.det_class) for l, t, w, h, d in zip(ls,ts,ws,hs, track.history)]
|
||||||
|
# new_track = Track(track.track_id, new_history, track.predictor_history, track.predictions)
|
||||||
|
new_history = []
|
||||||
|
for j in range(len(self.history)):
|
||||||
|
a = self.history[j]
|
||||||
|
new_history.append(Detection(a.track_id, a.l, a.t, a.w, a.h, a.conf, a.state, a.frame_nr, a.det_class))
|
||||||
|
|
||||||
|
if j+1 >= len(self.history):
|
||||||
|
break
|
||||||
|
|
||||||
|
b = self.history[j+1]
|
||||||
|
gap = b.frame_nr - a.frame_nr
|
||||||
|
if gap < 1:
|
||||||
|
logger.error(f"WARNING, gap between frames {a.frame_nr} -> {b.frame_nr} is negative?")
|
||||||
|
if gap > 1:
|
||||||
|
for g in range(1, gap):
|
||||||
|
l = lerp(a.l, b.l, g/gap)
|
||||||
|
t = lerp(a.t, b.t, g/gap)
|
||||||
|
w = lerp(a.w, b.w, g/gap)
|
||||||
|
h = lerp(a.h, b.h, g/gap)
|
||||||
|
conf = 0
|
||||||
|
state = DetectionState.Lost
|
||||||
|
frame_nr = a.frame_nr + g
|
||||||
|
new_history.append(Detection(a.track_id, l, t, w, h, conf, state, frame_nr, a.det_class))
|
||||||
|
|
||||||
|
return Track(
|
||||||
|
self.track_id,
|
||||||
|
new_history,
|
||||||
|
self.predictor_history,
|
||||||
|
self.predictions,
|
||||||
|
self.fps)
|
||||||
|
|
||||||
|
def is_complete(self):
|
||||||
|
diffs = [(b.frame_nr - a.frame_nr) for a,b in zip(self.history[:-1], self.history[1:])]
|
||||||
|
return any([d != 1 for d in diffs])
|
||||||
|
|
||||||
|
|
||||||
|
def get_sampled(self, step_size = 1, offset=0):
|
||||||
|
if not self.is_complete():
|
||||||
|
t = self.get_with_interpolated_history()
|
||||||
|
else:
|
||||||
|
t = self
|
||||||
|
|
||||||
|
return Track(
|
||||||
|
t.track_id,
|
||||||
|
t.history[offset::step_size],
|
||||||
|
t.predictor_history,
|
||||||
|
t.predictions,
|
||||||
|
t.fps/step_size)
|
||||||
|
|
||||||
|
def get_binned(self, bin_size, camera: Camera, bin_start=True):
|
||||||
|
"""
|
||||||
|
For an experiment: what if we predict using only concrete positions, by mapping
|
||||||
|
dx,dy to a grid. Thus prediction can be for 8 moves, or rather headings
|
||||||
|
see ~/notes/attachments example svg
|
||||||
|
"""
|
||||||
|
|
||||||
|
history = self.get_projected_history_as_dict(H=None, camera=camera)
|
||||||
|
|
||||||
|
def round_to_grid_precision(x):
|
||||||
|
factor = 1/bin_size
|
||||||
|
return round(x * factor) / factor
|
||||||
|
|
||||||
|
new_history: List[dict] = []
|
||||||
|
for i, (det0, det1) in enumerate(zip(history[:-1], history[1:])):
|
||||||
|
if i == 0:
|
||||||
|
new_history.append({
|
||||||
|
'x': round_to_grid_precision(det0['x']),
|
||||||
|
'y': round_to_grid_precision(det0['y'])
|
||||||
|
} if bin_start else det0)
|
||||||
|
continue
|
||||||
|
if abs(det1['x'] - new_history[-1]['x']) < bin_size and abs(det1['y'] - new_history[-1]['y']) < bin_size:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# det1 falls outside of the box [-bin_size:+bin_size] around last detection
|
||||||
|
|
||||||
|
# 1. Interpolate exact point between det0 and det1 that this happens
|
||||||
|
if abs(det1['x'] - new_history[-1]['x']) >= bin_size:
|
||||||
|
if det1['x'] - new_history[-1]['x'] >= bin_size:
|
||||||
|
# det1 left of last
|
||||||
|
x = new_history[-1]['x'] + bin_size
|
||||||
|
f = inv_lerp(det0['x'], det1['x'], x)
|
||||||
|
elif new_history[-1]['x'] - det1['x'] >= bin_size:
|
||||||
|
# det1 left of last
|
||||||
|
x = new_history[-1]['x'] - bin_size
|
||||||
|
f = inv_lerp(det0['x'], det1['x'], x)
|
||||||
|
y = lerp(det0['y'], det1['y'], f)
|
||||||
|
if abs(det1['y'] - new_history[-1]['y']) >= bin_size:
|
||||||
|
if det1['y'] - new_history[-1]['y'] >= bin_size:
|
||||||
|
# det1 left of last
|
||||||
|
y = new_history[-1]['y'] + bin_size
|
||||||
|
f = inv_lerp(det0['y'], det1['y'], y)
|
||||||
|
elif new_history[-1]['y'] - det1['y'] >= bin_size:
|
||||||
|
# det1 left of last
|
||||||
|
y = new_history[-1]['y'] - bin_size
|
||||||
|
f = inv_lerp(det0['y'], det1['y'], y)
|
||||||
|
x = lerp(det0['x'], det1['x'], f)
|
||||||
|
|
||||||
|
|
||||||
|
# 2. Find closest point on rectangle (rectangle's four corners, or 4 midpoints)
|
||||||
|
points = get_bins(bin_size)
|
||||||
|
points = [[new_history[-1]['x']+p[0], new_history[-1]['y'] + p[1]] for p in points]
|
||||||
|
|
||||||
|
distances = [np.linalg.norm([p[0] - x, p[1]-y]) for p in points]
|
||||||
|
closest = np.argmin(distances)
|
||||||
|
|
||||||
|
point = points[closest]
|
||||||
|
|
||||||
|
new_history.append({'x': point[0], 'y':point[1]})
|
||||||
|
# todo Offsets to points:[ history for in points]
|
||||||
|
return new_history
|
||||||
|
|
||||||
|
|
||||||
|
def to_trajectron_node(self, camera: Camera, env: Environment) -> Node:
|
||||||
|
positions = self.get_projected_history(None, camera)
|
||||||
|
velocity = np.gradient(positions, 1/self.fps, axis=0)
|
||||||
|
acceleration = np.gradient(velocity, 1/self.fps, axis=0)
|
||||||
|
|
||||||
|
new_first_idx = self.history[0].frame_nr
|
||||||
|
|
||||||
|
data_columns = pd.MultiIndex.from_product([['position', 'velocity', 'acceleration'], ['x', 'y']])
|
||||||
|
|
||||||
|
|
||||||
|
# vx = derivative_of(x, scene.dt)
|
||||||
|
# vy = derivative_of(y, scene.dt)
|
||||||
|
# ax = derivative_of(vx, scene.dt)
|
||||||
|
# ay = derivative_of(vy, scene.dt)
|
||||||
|
|
||||||
|
data_dict = {
|
||||||
|
('position', 'x'): positions[:,0],
|
||||||
|
('position', 'y'): positions[:,1],
|
||||||
|
('velocity', 'x'): velocity[:,0],
|
||||||
|
('velocity', 'y'): velocity[:,1],
|
||||||
|
('acceleration', 'x'): acceleration[:,0],
|
||||||
|
('acceleration', 'y'): acceleration[:,1]
|
||||||
|
}
|
||||||
|
|
||||||
|
node_data = pd.DataFrame(data_dict, columns=data_columns)
|
||||||
|
|
||||||
|
return Node(node_type=env.NodeType.PEDESTRIAN, node_id=self.track_id, data=node_data, first_timestep=new_first_idx)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class Frame:
|
||||||
|
index: int
|
||||||
|
img: np.array
|
||||||
|
time: float= field(default_factory=lambda: time.time())
|
||||||
|
tracks: Optional[dict[str, Track]] = None
|
||||||
|
H: Optional[np.array] = None
|
||||||
|
camera: Optional[Camera] = None
|
||||||
|
maps: Optional[List[cv2.Mat]] = None
|
||||||
|
|
||||||
|
def aslist(self) -> [dict]:
|
||||||
|
return { t.track_id:
|
||||||
|
{
|
||||||
|
'id': t.track_id,
|
||||||
|
'history': t.get_projected_history(self.H).tolist(),
|
||||||
|
'det_conf': t.history[-1].conf,
|
||||||
|
# 'det_conf': trajectory_data[node.id]['det_conf'],
|
||||||
|
# 'bbox': trajectory_data[node.id]['bbox'],
|
||||||
|
# 'history': history.tolist(),
|
||||||
|
'predictions': t.predictions
|
||||||
|
} for t in self.tracks.values()
|
||||||
|
}
|
||||||
|
|
||||||
|
def without_img(self):
|
||||||
|
return Frame(self.index, None, self.time, self.tracks, self.H, self.camera, self.maps)
|
||||||
|
|
||||||
|
def video_src_from_config(config) -> UrlOrPath:
|
||||||
|
if config.video_loop:
|
||||||
|
video_srcs: Iterable[UrlOrPath] = cycle(config.video_src)
|
||||||
|
else:
|
||||||
|
video_srcs: Iterable[UrlOrPath] = config.video_src
|
||||||
|
return video_srcs
|
||||||
|
|
||||||
class FrameEmitter:
|
class FrameEmitter:
|
||||||
'''
|
'''
|
||||||
|
@ -88,20 +458,93 @@ class FrameEmitter:
|
||||||
|
|
||||||
def emit_video(self, timer_counter):
|
def emit_video(self, timer_counter):
|
||||||
i = 0
|
i = 0
|
||||||
|
delay_generation = False
|
||||||
|
for video_path in self.video_srcs:
|
||||||
|
logger.info(f"Play from '{str(video_path)}'")
|
||||||
|
if str(video_path).isdigit():
|
||||||
|
# numeric input is a CV camera
|
||||||
|
video = cv2.VideoCapture(int(str(video_path)))
|
||||||
|
# TODO: make config variables
|
||||||
|
video.set(cv2.CAP_PROP_FRAME_WIDTH, int(self.config.camera.w))
|
||||||
|
video.set(cv2.CAP_PROP_FRAME_HEIGHT, int(self.config.camera.h))
|
||||||
|
print("exposure!", video.get(cv2.CAP_PROP_AUTO_EXPOSURE))
|
||||||
|
video.set(cv2.CAP_PROP_FPS, 5)
|
||||||
|
fps=5
|
||||||
|
elif video_path.url.scheme == 'rtsp':
|
||||||
|
gst = f"rtspsrc location={video_path} latency=0 buffer-mode=auto ! decodebin ! videoconvert ! appsink max-buffers=0 drop=true"
|
||||||
|
logger.info(f"Capture gstreamer (gst-launch-1.0): {gst}")
|
||||||
|
video = cv2.VideoCapture(gst, cv2.CAP_GSTREAMER)
|
||||||
|
fps=12
|
||||||
|
else:
|
||||||
|
# os.environ["OPENCV_FFMPEG_CAPTURE_OPTIONS"] = "fflags;nobuffer|flags;low_delay|avioflags;direct|rtsp_transport;udp"
|
||||||
|
video = cv2.VideoCapture(str(video_path))
|
||||||
|
delay_generation = True
|
||||||
|
fps = video.get(cv2.CAP_PROP_FPS)
|
||||||
|
target_frame_duration = 1./fps
|
||||||
|
logger.info(f"Emit frames at {fps} fps")
|
||||||
|
|
||||||
source = get_video_source(self.video_srcs, self.config.camera)
|
if self.config.video_offset:
|
||||||
for i, img in enumerate(source):
|
logger.info(f"Start at frame {self.config.video_offset}")
|
||||||
|
video.set(cv2.CAP_PROP_POS_FRAMES, self.config.video_offset)
|
||||||
|
i = self.config.video_offset
|
||||||
|
|
||||||
|
|
||||||
|
# if '-' in video_path.path().stem:
|
||||||
|
# path_stem = video_path.stem[:video_path.stem.rfind('-')]
|
||||||
|
# else:
|
||||||
|
# path_stem = video_path.stem
|
||||||
|
# path_stem += "-homography"
|
||||||
|
# homography_path = video_path.with_stem(path_stem).with_suffix('.txt')
|
||||||
|
# logger.info(f'check homography file {homography_path}')
|
||||||
|
# if homography_path.exists():
|
||||||
|
# logger.info(f'Found custom homography file! Using {homography_path}')
|
||||||
|
# video_H = np.loadtxt(homography_path, delimiter=',')
|
||||||
|
# else:
|
||||||
|
# video_H = None
|
||||||
|
video_H = self.config.camera.H
|
||||||
|
|
||||||
|
prev_time = time.time()
|
||||||
|
|
||||||
|
|
||||||
|
while self.is_running.is_set():
|
||||||
with timer_counter.get_lock():
|
with timer_counter.get_lock():
|
||||||
timer_counter.value += 1
|
timer_counter.value += 1
|
||||||
|
|
||||||
frame = Frame(i, img=img, H=self.config.camera.H, camera=self.config.camera)
|
ret, img = video.read()
|
||||||
|
|
||||||
|
# seek to 0 if video has finished. Infinite loop
|
||||||
|
if not ret:
|
||||||
|
# now loading multiple files
|
||||||
|
break
|
||||||
|
# video.set(cv2.CAP_PROP_POS_FRAMES, 0)
|
||||||
|
# ret, img = video.read()
|
||||||
|
# assert ret is not False # not really error proof...
|
||||||
|
|
||||||
|
|
||||||
|
if "DATASETS/hof/" in str(video_path):
|
||||||
|
# hack to mask out area
|
||||||
|
cv2.rectangle(img, (0,0), (800,200), (0,0,0), -1)
|
||||||
|
|
||||||
|
frame = Frame(index=i, img=img, H=self.config.H, camera=self.config.camera)
|
||||||
# TODO: this is very dirty, need to find another way.
|
# TODO: this is very dirty, need to find another way.
|
||||||
# perhaps multiprocessing Array?
|
# perhaps multiprocessing Array?
|
||||||
self.frame_noimg_sock.send(pickle.dumps(frame.without_img()))
|
self.frame_noimg_sock.send(pickle.dumps(frame.without_img()))
|
||||||
self.frame_sock.send(pickle.dumps(frame))
|
self.frame_sock.send(pickle.dumps(frame))
|
||||||
|
|
||||||
|
# only delay consuming the next frame when using a file.
|
||||||
|
# Otherwise, go ASAP
|
||||||
|
if delay_generation:
|
||||||
|
# defer next loop
|
||||||
|
now = time.time()
|
||||||
|
time_diff = (now - prev_time)
|
||||||
|
if time_diff < target_frame_duration:
|
||||||
|
time.sleep(target_frame_duration - time_diff)
|
||||||
|
now += target_frame_duration - time_diff
|
||||||
|
|
||||||
|
prev_time = now
|
||||||
|
|
||||||
|
i += 1
|
||||||
|
|
||||||
if not self.is_running.is_set():
|
if not self.is_running.is_set():
|
||||||
# if not running, also break out of infinite generator loop
|
# if not running, also break out of infinite generator loop
|
||||||
break
|
break
|
||||||
|
|
|
@ -205,9 +205,7 @@ class DrawnTrack:
|
||||||
if draw_dot:
|
if draw_dot:
|
||||||
line = pyglet.shapes.Arc(x2, y2, 10, thickness=2, color=color, batch=self.renderer.batch_anim)
|
line = pyglet.shapes.Arc(x2, y2, 10, thickness=2, color=color, batch=self.renderer.batch_anim)
|
||||||
else:
|
else:
|
||||||
# line = self.renderer.gradientLine(x, y, x2, y2, 3, color, color, batch=self.renderer.batch_anim)
|
line = self.renderer.gradientLine(x, y, x2, y2, 3, color, color, batch=self.renderer.batch_anim)
|
||||||
line = pyglet.shapes.Line(x, y, x2, y2, 3, color, batch=self.renderer.batch_anim)
|
|
||||||
# line = self.renderer.gradientLine(x, y, x2, y2, 3, color, color, batch=self.renderer.batch_anim)
|
|
||||||
line.opacity = 20 if not for_laser else 255
|
line.opacity = 20 if not for_laser else 255
|
||||||
self.shapes.append(line)
|
self.shapes.append(line)
|
||||||
|
|
||||||
|
|
|
@ -362,12 +362,11 @@ def draw_track_predictions(img: cv2.Mat, track: Track, color_index: int, camera:
|
||||||
# cv2 only draws to integer coordinates
|
# cv2 only draws to integer coordinates
|
||||||
points = np.rint(points).astype(int)
|
points = np.rint(points).astype(int)
|
||||||
thickness = max(1, int(cluster.probability * 6))
|
thickness = max(1, int(cluster.probability * 6))
|
||||||
thickness=1
|
if len(cluster.next_point_clusters) == 1:
|
||||||
# if len(cluster.next_point_clusters) == 1:
|
|
||||||
# not a final point, nor a split:
|
# not a final point, nor a split:
|
||||||
cv2.line(img, points[0], points[1], color, thickness, lineType=cv2.LINE_AA)
|
cv2.line(img, points[0], points[1], color, thickness, lineType=cv2.LINE_AA)
|
||||||
# else:
|
else:
|
||||||
# cv2.arrowedLine(img, points[0], points[1], color, thickness, cv2.LINE_AA)
|
cv2.arrowedLine(img, points[0], points[1], color, thickness, cv2.LINE_AA)
|
||||||
|
|
||||||
for sub in cluster.next_point_clusters:
|
for sub in cluster.next_point_clusters:
|
||||||
draw_cluster(img, sub)
|
draw_cluster(img, sub)
|
||||||
|
|
|
@ -1,170 +0,0 @@
|
||||||
import logging
|
|
||||||
import math
|
|
||||||
from pathlib import Path
|
|
||||||
import time
|
|
||||||
from typing import Any, Generator, Iterable, List, Optional
|
|
||||||
import neoapi
|
|
||||||
import cv2
|
|
||||||
import numpy as np
|
|
||||||
|
|
||||||
from trap.frame_emitter import Camera, Frame, UrlOrPath
|
|
||||||
|
|
||||||
logger = logging.getLogger('video_source')
|
|
||||||
|
|
||||||
class VideoSource:
|
|
||||||
"""Video Frame generator
|
|
||||||
"""
|
|
||||||
def recv(self) -> Generator[Optional[cv2.typing.MatLike], Any, None]:
|
|
||||||
raise RuntimeError("Not implemented")
|
|
||||||
|
|
||||||
def __iter__(self):
|
|
||||||
for i in self.recv():
|
|
||||||
yield i
|
|
||||||
|
|
||||||
|
|
||||||
class GigE(VideoSource):
|
|
||||||
def __init__(self, identifier=None):
|
|
||||||
|
|
||||||
self.camera = neoapi.Cam()
|
|
||||||
# self.camera.Connect('-B127')
|
|
||||||
self.camera.Connect(identifier)
|
|
||||||
# Default buffer mode, streaming, always returns latest frame
|
|
||||||
self.camera.SetImageBufferCount(10)
|
|
||||||
# neoAPI docs: Setting the neoapi.Cam.SetImageBufferCycleCount()to one ensures that all buffers but one are given back to the neoAPI to be re-cycled and never given to the user by the neoapi.Cam.GetImage() method.
|
|
||||||
self.camera.SetImageBufferCycleCount(1)
|
|
||||||
# if self.camera.IsConnected():
|
|
||||||
# self.camera.f.PixelFormat.Set(neoapi.PixelFormat_RGB8)
|
|
||||||
# self.camera.f.BinningHorizontal.Set(2)
|
|
||||||
# self.camera.f.BinningVertical.Set(2)
|
|
||||||
self.pixfmt = self.camera.f.PixelFormat.Get()
|
|
||||||
|
|
||||||
def recv(self):
|
|
||||||
while True:
|
|
||||||
if not self.camera.IsConnected():
|
|
||||||
return
|
|
||||||
|
|
||||||
i = self.camera.GetImage(0)
|
|
||||||
if i.IsEmpty():
|
|
||||||
time.sleep(.01)
|
|
||||||
continue
|
|
||||||
|
|
||||||
imgarray = i.GetNPArray()
|
|
||||||
if self.pixfmt == neoapi.PixelFormat_BayerRG12:
|
|
||||||
img = cv2.cvtColor(imgarray, cv2.COLOR_BayerRG2RGB)
|
|
||||||
elif self.pixfmt == neoapi.PixelFormat_BayerRG8:
|
|
||||||
img = cv2.cvtColor(imgarray, cv2.COLOR_BayerRG2RGB)
|
|
||||||
else:
|
|
||||||
img = cv2.cvtColor(imgarray, cv2.COLOR_BGR2RGB)
|
|
||||||
|
|
||||||
if img.dtype == np.uint16:
|
|
||||||
img = cv2.convertScaleAbs(img, alpha=(255.0/65535.0))
|
|
||||||
yield img
|
|
||||||
|
|
||||||
|
|
||||||
class SingleCvVideoSource(VideoSource):
|
|
||||||
def recv(self):
|
|
||||||
while True:
|
|
||||||
ret, img = self.video.read()
|
|
||||||
self.frame_idx+=1
|
|
||||||
|
|
||||||
# seek to 0 if video has finished. Infinite loop
|
|
||||||
if not ret:
|
|
||||||
# now loading multiple files
|
|
||||||
break
|
|
||||||
|
|
||||||
# frame = Frame(index=self.n, img=img, H=self.camera.H, camera=self.camera)
|
|
||||||
yield img
|
|
||||||
|
|
||||||
class RtspSource(SingleCvVideoSource):
|
|
||||||
def __init__(self, video_url: str | Path, camera: Camera = None):
|
|
||||||
gst = f"rtspsrc location={video_url} latency=0 buffer-mode=auto ! decodebin ! videoconvert ! appsink max-buffers=0 drop=true"
|
|
||||||
logger.info(f"Capture gstreamer (gst-launch-1.0): {gst}")
|
|
||||||
self.video = cv2.VideoCapture(gst, cv2.CAP_GSTREAMER)
|
|
||||||
self.frame_idx = 0
|
|
||||||
|
|
||||||
|
|
||||||
class FilelistSource(SingleCvVideoSource):
|
|
||||||
def __init__(self, video_sources: Iterable[UrlOrPath], camera: Camera = None, delay = True):
|
|
||||||
# store current position
|
|
||||||
self.video_sources = video_sources
|
|
||||||
self.camera = camera
|
|
||||||
self.video_path = None
|
|
||||||
self.video_nr = None
|
|
||||||
self.frame_count = None
|
|
||||||
self.frame_idx = None
|
|
||||||
self.n = 0
|
|
||||||
self.delay_generation = delay
|
|
||||||
|
|
||||||
def recv(self):
|
|
||||||
prev_time = time.time()
|
|
||||||
|
|
||||||
for video_nr, video_path in enumerate(self.video_sources):
|
|
||||||
self.video_path = video_path
|
|
||||||
self.video_nr = video_nr
|
|
||||||
logger.info(f"Play from '{str(video_path)}'")
|
|
||||||
video = cv2.VideoCapture(str(video_path))
|
|
||||||
fps = video.get(cv2.CAP_PROP_FPS)
|
|
||||||
target_frame_duration = 1./fps
|
|
||||||
self.frame_count = video.get(cv2.CAP_PROP_FRAME_COUNT)
|
|
||||||
if self.frame_count < 0:
|
|
||||||
self.frame_count = math.inf
|
|
||||||
self.frame_idx = 0
|
|
||||||
# TODO)) Video offset
|
|
||||||
# if self.config.video_offset:
|
|
||||||
# logger.info(f"Start at frame {self.config.video_offset}")
|
|
||||||
# video.set(cv2.CAP_PROP_POS_FRAMES, self.config.video_offset)
|
|
||||||
# self.frame_idx = self.config.video_offset
|
|
||||||
|
|
||||||
while True:
|
|
||||||
ret, img = video.read()
|
|
||||||
self.frame_idx+=1
|
|
||||||
self.n+=1
|
|
||||||
|
|
||||||
# seek to 0 if video has finished. Infinite loop
|
|
||||||
if not ret:
|
|
||||||
# now loading multiple files
|
|
||||||
break
|
|
||||||
|
|
||||||
if "DATASETS/hof/" in str(video_path):
|
|
||||||
# hack to mask out area
|
|
||||||
cv2.rectangle(img, (0,0), (800,200), (0,0,0), -1)
|
|
||||||
|
|
||||||
|
|
||||||
# frame = Frame(index=self.n, img=img, H=self.camera.H, camera=self.camera)
|
|
||||||
yield img
|
|
||||||
|
|
||||||
if self.delay_generation:
|
|
||||||
# defer next loop
|
|
||||||
now = time.time()
|
|
||||||
time_diff = (now - prev_time)
|
|
||||||
if time_diff < target_frame_duration:
|
|
||||||
time.sleep(target_frame_duration - time_diff)
|
|
||||||
now += target_frame_duration - time_diff
|
|
||||||
|
|
||||||
prev_time = now
|
|
||||||
|
|
||||||
class CameraSource(SingleCvVideoSource):
|
|
||||||
def __init__(self, identifier: int, camera: Camera):
|
|
||||||
self.video = cv2.VideoCapture(identifier)
|
|
||||||
self.camera = camera
|
|
||||||
|
|
||||||
# TODO: make config variables
|
|
||||||
self.video.set(cv2.CAP_PROP_FRAME_WIDTH, int(self.camera.w))
|
|
||||||
self.video.set(cv2.CAP_PROP_FRAME_HEIGHT, int(self.camera.h))
|
|
||||||
# print("exposure!", video.get(cv2.CAP_PROP_AUTO_EXPOSURE))
|
|
||||||
self.video.set(cv2.CAP_PROP_FPS, self.camera.fps)
|
|
||||||
self.frame_idx = 0
|
|
||||||
|
|
||||||
def get_video_source(video_sources: List[UrlOrPath], camera: Camera):
|
|
||||||
if str(video_sources[0]).isdigit():
|
|
||||||
# numeric input is a CV camera
|
|
||||||
return CameraSource(int(str(video_sources[0])), camera)
|
|
||||||
elif video_sources[0].url.scheme == 'rtsp':
|
|
||||||
video_sources[0].url.hostname
|
|
||||||
return RtspSource(video_sources[0])
|
|
||||||
elif video_sources[0].url.scheme == 'gige':
|
|
||||||
return GigE(video_sources[0].url.hostname)
|
|
||||||
else:
|
|
||||||
return FilelistSource(video_sources)
|
|
||||||
# os.environ["OPENCV_FFMPEG_CAPTURE_OPTIONS"] = "fflags;nobuffer|flags;low_delay|avioflags;direct|rtsp_transport;udp"
|
|
||||||
|
|
Loading…
Reference in a new issue