Refactor to accomodate gige://IDENTIFIER inputs

This commit is contained in:
Ruben van de Ven 2025-03-03 11:13:39 +01:00
parent c4d498d551
commit 978d94024e
5 changed files with 1425 additions and 1295 deletions

1679
poetry.lock generated

File diff suppressed because it is too large Load diff

View file

@ -44,6 +44,8 @@ bytetracker = { git = "https://github.com/rubenvandeven/bytetrack-pip" }
jsonlines = "^4.0.0"
tensorboardx = "^2.6.2.2"
shapely = "^1"
baumer-neoapi = {path = "../../Downloads/Baumer_neoAPI_1.4.1_lin_x86_64_python/wheel/baumer_neoapi-1.4.1-cp34.cp35.cp36.cp37.cp38.cp39.cp310.cp311.cp312-none-linux_x86_64.whl"}
qrcode = "^8.0"
[build-system]
requires = ["poetry-core"]

404
trap/base.py Normal file
View file

@ -0,0 +1,404 @@
from __future__ import annotations
from ast import List
from enum import IntFlag
from itertools import cycle
import json
import logging
from pathlib import Path
import time
from typing import Iterable, Optional
import cv2
from dataclasses import dataclass, field
import numpy as np
from deep_sort_realtime.deep_sort.track import Track as DeepsortTrack
from deep_sort_realtime.deep_sort.track import TrackState as DeepsortTrackState
from bytetracker.byte_tracker import STrack as ByteTrackTrack
from bytetracker.basetrack import TrackState as ByteTrackTrackState
import pandas as pd
from trap.utils import get_bins, inv_lerp, lerp
from trajectron.environment import Environment, Node, Scene
from urllib.parse import urlparse
logger = logging.getLogger('trap.base')
class UrlOrPath():
def __init__(self, string):
self.url = urlparse(str(string))
def __str__(self) -> str:
return self.url.geturl()
def is_url(self) -> bool:
return len(self.url.netloc) > 0
def path(self) -> Path:
if self.is_url():
return Path(self.url.path)
return Path(self.url.geturl()) # can include scheme, such as C:/
class Space(IntFlag):
Image = 1 # As detected in the image
Undistorted = 2 # After applying lense undistortiion
World = 4 # After lens undistort and homography
Render = 8 # View space of renderer
@dataclass
class Position:
x: float
y: float
conf: float
state: DetectionState
frame_nr: int
det_class: str
class DetectionState(IntFlag):
Tentative = 1 # state before n_init (see DeepsortTrack)
Confirmed = 2 # after tentative
Lost = 4 # lost when DeepsortTrack.time_since_update > 0 but not Deleted
Interpolated = 8 # A position estimated through interpolation of adjecent detections
@classmethod
def from_deepsort_track(cls, track: DeepsortTrack):
if track.state == DeepsortTrackState.Tentative:
return cls.Tentative
if track.state == DeepsortTrackState.Confirmed:
if track.time_since_update > 0:
return cls.Lost
return cls.Confirmed
raise RuntimeError("Should not run into Deleted entries here")
@classmethod
def from_bytetrack_track(cls, track: ByteTrackTrack):
if track.state == ByteTrackTrackState.New:
return cls.Tentative
if track.state == ByteTrackTrackState.Lost:
return cls.Lost
# if track.time_since_update > 0:
if track.state == ByteTrackTrackState.Tracked:
return cls.Confirmed
raise RuntimeError("Should not run into Deleted entries here")
def H_from_path(path: Path):
if path.suffix == '.json':
with path.open('r') as fp:
H = np.array(json.load(fp))
else:
H = np.loadtxt(path, delimiter=',')
return H
@dataclass
class Camera:
mtx: cv2.Mat
dist: cv2.Mat
w: float
h: float
H: cv2.Mat # homography
newcameramtx: cv2.Mat = field(init=False)
roi: cv2.typing.Rect = field(init=False)
fps: float
def __post_init__(self):
self.newcameramtx, self.roi = cv2.getOptimalNewCameraMatrix(self.mtx, self.dist, (self.w,self.h), 1, (self.w,self.h))
@classmethod
def from_calibfile(cls, calibration_path, H, fps):
with calibration_path.open('r') as fp:
data = json.load(fp)
# print(data)
# print(data['camera_matrix'])
# camera = {
# 'camera_matrix': np.array(data['camera_matrix']),
# 'dist_coeff': np.array(data['dist_coeff']),
# }
return cls(
np.array(data['camera_matrix']),
np.array(data['dist_coeff']),
data['dim']['width'],
data['dim']['height'],
H, fps)
@classmethod
def from_paths(cls, calibration_path, h_path, fps):
H = H_from_path(h_path)
return cls.from_calibfile(calibration_path, H, fps)
# def __init__(self, mtx, dist, w, h, H):
# self.mtx = mtx
# self.dist = dist
# self.w = w
# self.h = h
# self.newcameramtx, self.roi = cv2.getOptimalNewCameraMatrix(mtx, dist, (w,h), 1, (w,h))
# self.H = H # homography
@dataclass
class Detection:
track_id: str # deepsort track id association
l: int # left - image space
t: int # top - image space
w: int # width - image space
h: int # height - image space
conf: float # object detector probablity
state: DetectionState
frame_nr: int
det_class: str
def get_foot_coords(self) -> list[float, float]:
return [self.l + 0.5 * self.w, self.t+self.h]
@classmethod
def from_deepsort(cls, dstrack: DeepsortTrack, frame_nr: int):
return cls(dstrack.track_id, *dstrack.to_ltwh(), dstrack.det_conf, DetectionState.from_deepsort_track(dstrack), frame_nr, dstrack.det_class)
@classmethod
def from_bytetrack(cls, bstrack: ByteTrackTrack, frame_nr: int):
return cls(bstrack.track_id, *bstrack.tlwh, bstrack.score, DetectionState.from_bytetrack_track(bstrack), frame_nr, bstrack.cls)
def get_scaled(self, scale: float = 1):
if scale == 1:
return self
return Detection(
self.track_id,
self.l*scale,
self.t*scale,
self.w*scale,
self.h*scale,
self.conf,
self.state,
self.frame_nr,
self.det_class)
def to_ltwh(self):
return (int(self.l), int(self.t), int(self.w), int(self.h))
def to_ltrb(self):
return (int(self.l), int(self.t), int(self.l+self.w), int(self.t+self.h))
@dataclass
class Track:
"""A bit of an haphazardous wrapper around the 'real' tracker to provide
a history, with which the predictor can work, as we then can deduce velocity
and acceleration.
"""
track_id: str = None
history: List[Detection] = field(default_factory=list)
predictor_history: Optional[list] = None # in image space
predictions: Optional[list] = None
fps: int = 12 # TODO)) convert this to camera? That way, incorporates H and dist, alternatively, each track is as a whole attached to a space
source: Optional[int] = None # to keep track of processed tracks
def get_projected_history(self, H: Optional[cv2.Mat] = None, camera: Optional[Camera]= None) -> np.array:
foot_coordinates = [d.get_foot_coords() for d in self.history]
# TODO)) Undistort points before perspective transform
if len(foot_coordinates):
if camera:
coords = cv2.undistortPoints(np.array([foot_coordinates]).astype('float32'), camera.mtx, camera.dist, None, camera.newcameramtx)
coords = cv2.perspectiveTransform(np.array(coords),camera.H)
return coords.reshape((coords.shape[0],2))
else:
coords = cv2.perspectiveTransform(np.array([foot_coordinates]),H)
return coords[0]
return np.array([])
def get_projected_history_as_dict(self, H, camera: Optional[Camera]= None) -> dict:
coords = self.get_projected_history(H, camera)
return [{"x":c[0], "y":c[1]} for c in coords]
def get_with_interpolated_history(self) -> Track:
# new_history = [Detection(d.track_id, l, t, w, h, d.conf, d.state, d.frame_nr, d.det_class) for l, t, w, h, d in zip(ls,ts,ws,hs, track.history)]
# new_track = Track(track.track_id, new_history, track.predictor_history, track.predictions)
new_history = []
for j in range(len(self.history)):
a = self.history[j]
new_history.append(Detection(a.track_id, a.l, a.t, a.w, a.h, a.conf, a.state, a.frame_nr, a.det_class))
if j+1 >= len(self.history):
break
b = self.history[j+1]
gap = b.frame_nr - a.frame_nr
if gap < 1:
logger.error(f"WARNING, gap between frames {a.frame_nr} -> {b.frame_nr} is negative?")
if gap > 1:
for g in range(1, gap):
l = lerp(a.l, b.l, g/gap)
t = lerp(a.t, b.t, g/gap)
w = lerp(a.w, b.w, g/gap)
h = lerp(a.h, b.h, g/gap)
conf = 0
state = DetectionState.Lost
frame_nr = a.frame_nr + g
new_history.append(Detection(a.track_id, l, t, w, h, conf, state, frame_nr, a.det_class))
return Track(
self.track_id,
new_history,
self.predictor_history,
self.predictions,
self.fps)
def is_complete(self):
diffs = [(b.frame_nr - a.frame_nr) for a,b in zip(self.history[:-1], self.history[1:])]
return any([d != 1 for d in diffs])
def get_sampled(self, step_size = 1, offset=0):
if not self.is_complete():
t = self.get_with_interpolated_history()
else:
t = self
return Track(
t.track_id,
t.history[offset::step_size],
t.predictor_history,
t.predictions,
t.fps/step_size)
def get_binned(self, bin_size, camera: Camera, bin_start=True):
"""
For an experiment: what if we predict using only concrete positions, by mapping
dx,dy to a grid. Thus prediction can be for 8 moves, or rather headings
see ~/notes/attachments example svg
"""
history = self.get_projected_history_as_dict(H=None, camera=camera)
def round_to_grid_precision(x):
factor = 1/bin_size
return round(x * factor) / factor
new_history: List[dict] = []
for i, (det0, det1) in enumerate(zip(history[:-1], history[1:])):
if i == 0:
new_history.append({
'x': round_to_grid_precision(det0['x']),
'y': round_to_grid_precision(det0['y'])
} if bin_start else det0)
continue
if abs(det1['x'] - new_history[-1]['x']) < bin_size and abs(det1['y'] - new_history[-1]['y']) < bin_size:
continue
# det1 falls outside of the box [-bin_size:+bin_size] around last detection
# 1. Interpolate exact point between det0 and det1 that this happens
if abs(det1['x'] - new_history[-1]['x']) >= bin_size:
if det1['x'] - new_history[-1]['x'] >= bin_size:
# det1 left of last
x = new_history[-1]['x'] + bin_size
f = inv_lerp(det0['x'], det1['x'], x)
elif new_history[-1]['x'] - det1['x'] >= bin_size:
# det1 left of last
x = new_history[-1]['x'] - bin_size
f = inv_lerp(det0['x'], det1['x'], x)
y = lerp(det0['y'], det1['y'], f)
if abs(det1['y'] - new_history[-1]['y']) >= bin_size:
if det1['y'] - new_history[-1]['y'] >= bin_size:
# det1 left of last
y = new_history[-1]['y'] + bin_size
f = inv_lerp(det0['y'], det1['y'], y)
elif new_history[-1]['y'] - det1['y'] >= bin_size:
# det1 left of last
y = new_history[-1]['y'] - bin_size
f = inv_lerp(det0['y'], det1['y'], y)
x = lerp(det0['x'], det1['x'], f)
# 2. Find closest point on rectangle (rectangle's four corners, or 4 midpoints)
points = get_bins(bin_size)
points = [[new_history[-1]['x']+p[0], new_history[-1]['y'] + p[1]] for p in points]
distances = [np.linalg.norm([p[0] - x, p[1]-y]) for p in points]
closest = np.argmin(distances)
point = points[closest]
new_history.append({'x': point[0], 'y':point[1]})
# todo Offsets to points:[ history for in points]
return new_history
def to_trajectron_node(self, camera: Camera, env: Environment) -> Node:
positions = self.get_projected_history(None, camera)
velocity = np.gradient(positions, 1/self.fps, axis=0)
acceleration = np.gradient(velocity, 1/self.fps, axis=0)
new_first_idx = self.history[0].frame_nr
data_columns = pd.MultiIndex.from_product([['position', 'velocity', 'acceleration'], ['x', 'y']])
# vx = derivative_of(x, scene.dt)
# vy = derivative_of(y, scene.dt)
# ax = derivative_of(vx, scene.dt)
# ay = derivative_of(vy, scene.dt)
data_dict = {
('position', 'x'): positions[:,0],
('position', 'y'): positions[:,1],
('velocity', 'x'): velocity[:,0],
('velocity', 'y'): velocity[:,1],
('acceleration', 'x'): acceleration[:,0],
('acceleration', 'y'): acceleration[:,1]
}
node_data = pd.DataFrame(data_dict, columns=data_columns)
return Node(node_type=env.NodeType.PEDESTRIAN, node_id=self.track_id, data=node_data, first_timestep=new_first_idx)
@dataclass
class Frame:
index: int
img: np.array
time: float= field(default_factory=lambda: time.time())
tracks: Optional[dict[str, Track]] = None
H: Optional[np.array] = None
camera: Optional[Camera] = None
maps: Optional[List[cv2.Mat]] = None
def aslist(self) -> List[dict]:
return { t.track_id:
{
'id': t.track_id,
'history': t.get_projected_history(self.H).tolist(),
'det_conf': t.history[-1].conf,
# 'det_conf': trajectory_data[node.id]['det_conf'],
# 'bbox': trajectory_data[node.id]['bbox'],
# 'history': history.tolist(),
'predictions': t.predictions
} for t in self.tracks.values()
}
def without_img(self):
return Frame(self.index, None, self.time, self.tracks, self.H, self.camera, self.maps)
def video_src_from_config(config) -> Iterable[UrlOrPath]:
if config.video_loop:
video_srcs: Iterable[UrlOrPath] = cycle(config.video_src)
else:
video_srcs: Iterable[UrlOrPath] = config.video_src
return video_srcs
@dataclass
class Trajectory:
# TODO)) Replace history and predictions in Track with Trajectory
space: Space
fps: int = 12
points: List[Detection] = field(default_factory=list)
def __iter__(self):
for d in self.points:
yield d

View file

@ -25,8 +25,10 @@ from bytetracker.basetrack import TrackState as ByteTrackTrackState
from trajectron.environment import Environment, Node, Scene
from urllib.parse import urlparse
from trap.base import *
from trap.utils import get_bins
from trap.utils import inv_lerp, lerp
from trap.video_sources import get_video_source
logger = logging.getLogger('trap.frame_emitter')
@ -56,380 +58,8 @@ class DataclassJSONEncoder(json.JSONEncoder):
return super().default(o)
class UrlOrPath():
def __init__(self, string):
self.url = urlparse(str(string))
def __str__(self) -> str:
return self.url.geturl()
def is_url(self) -> bool:
return len(self.url.netloc) > 0
def path(self) -> Path:
if self.is_url():
return Path(self.url.path)
return Path(self.url.geturl()) # can include scheme, such as C:/
class Space(IntFlag):
Image = 1 # As detected in the image
Undistorted = 2 # After applying lense undistortiion
World = 4 # After lens undistort and homography
Render = 8 # View space of renderer
class DetectionState(IntFlag):
Tentative = 1 # state before n_init (see DeepsortTrack)
Confirmed = 2 # after tentative
Lost = 4 # lost when DeepsortTrack.time_since_update > 0 but not Deleted
Interpolated = 8 # A position estimated through interpolation of adjecent detections
@classmethod
def from_deepsort_track(cls, track: DeepsortTrack):
if track.state == DeepsortTrackState.Tentative:
return cls.Tentative
if track.state == DeepsortTrackState.Confirmed:
if track.time_since_update > 0:
return cls.Lost
return cls.Confirmed
raise RuntimeError("Should not run into Deleted entries here")
@classmethod
def from_bytetrack_track(cls, track: ByteTrackTrack):
if track.state == ByteTrackTrackState.New:
return cls.Tentative
if track.state == ByteTrackTrackState.Lost:
return cls.Lost
# if track.time_since_update > 0:
if track.state == ByteTrackTrackState.Tracked:
return cls.Confirmed
raise RuntimeError("Should not run into Deleted entries here")
def H_from_path(path: Path):
if path.suffix == '.json':
with path.open('r') as fp:
H = np.array(json.load(fp))
else:
H = np.loadtxt(path, delimiter=',')
return H
@dataclass
class Camera:
mtx: cv2.Mat
dist: cv2.Mat
w: float
h: float
H: cv2.Mat # homography
newcameramtx: cv2.Mat = field(init=False)
roi: cv2.typing.Rect = field(init=False)
fps: float
def __post_init__(self):
self.newcameramtx, self.roi = cv2.getOptimalNewCameraMatrix(self.mtx, self.dist, (self.w,self.h), 1, (self.w,self.h))
@classmethod
def from_calibfile(cls, calibration_path, H, fps):
with calibration_path.open('r') as fp:
data = json.load(fp)
# print(data)
# print(data['camera_matrix'])
# camera = {
# 'camera_matrix': np.array(data['camera_matrix']),
# 'dist_coeff': np.array(data['dist_coeff']),
# }
return cls(
np.array(data['camera_matrix']),
np.array(data['dist_coeff']),
data['dim']['width'],
data['dim']['height'],
H, fps)
@classmethod
def from_paths(cls, calibration_path, h_path, fps):
H = H_from_path(h_path)
return cls.from_calibfile(calibration_path, H, fps)
# def __init__(self, mtx, dist, w, h, H):
# self.mtx = mtx
# self.dist = dist
# self.w = w
# self.h = h
# self.newcameramtx, self.roi = cv2.getOptimalNewCameraMatrix(mtx, dist, (w,h), 1, (w,h))
# self.H = H # homography
@dataclass
class Position:
x: float
y: float
conf: float
state: DetectionState
frame_nr: int
det_class: str
@dataclass
class Detection:
track_id: str # deepsort track id association
l: int # left - image space
t: int # top - image space
w: int # width - image space
h: int # height - image space
conf: float # object detector probablity
state: DetectionState
frame_nr: int
det_class: str
def get_foot_coords(self) -> list[float, float]:
return [self.l + 0.5 * self.w, self.t+self.h]
@classmethod
def from_deepsort(cls, dstrack: DeepsortTrack, frame_nr: int):
return cls(dstrack.track_id, *dstrack.to_ltwh(), dstrack.det_conf, DetectionState.from_deepsort_track(dstrack), frame_nr, dstrack.det_class)
@classmethod
def from_bytetrack(cls, bstrack: ByteTrackTrack, frame_nr: int):
return cls(bstrack.track_id, *bstrack.tlwh, bstrack.score, DetectionState.from_bytetrack_track(bstrack), frame_nr, bstrack.cls)
def get_scaled(self, scale: float = 1):
if scale == 1:
return self
return Detection(
self.track_id,
self.l*scale,
self.t*scale,
self.w*scale,
self.h*scale,
self.conf,
self.state,
self.frame_nr,
self.det_class)
def to_ltwh(self):
return (int(self.l), int(self.t), int(self.w), int(self.h))
def to_ltrb(self):
return (int(self.l), int(self.t), int(self.l+self.w), int(self.t+self.h))
@dataclass
class Trajectory:
# TODO)) Replace history and predictions in Track with Trajectory
space: Space
fps: int = 12
points: List[Detection] = field(default_factory=list)
def __iter__(self):
for d in self.points:
yield d
@dataclass
class Track:
"""A bit of an haphazardous wrapper around the 'real' tracker to provide
a history, with which the predictor can work, as we then can deduce velocity
and acceleration.
"""
track_id: str = None
history: List[Detection] = field(default_factory=list)
predictor_history: Optional[list] = None # in image space
predictions: Optional[list] = None
fps: int = 12 # TODO)) convert this to camera? That way, incorporates H and dist, alternatively, each track is as a whole attached to a space
source: Optional[int] = None # to keep track of processed tracks
def get_projected_history(self, H: Optional[cv2.Mat] = None, camera: Optional[Camera]= None) -> np.array:
foot_coordinates = [d.get_foot_coords() for d in self.history]
# TODO)) Undistort points before perspective transform
if len(foot_coordinates):
if camera:
coords = cv2.undistortPoints(np.array([foot_coordinates]).astype('float32'), camera.mtx, camera.dist, None, camera.newcameramtx)
coords = cv2.perspectiveTransform(np.array(coords),camera.H)
return coords.reshape((coords.shape[0],2))
else:
coords = cv2.perspectiveTransform(np.array([foot_coordinates]),H)
return coords[0]
return np.array([])
def get_projected_history_as_dict(self, H, camera: Optional[Camera]= None) -> dict:
coords = self.get_projected_history(H, camera)
return [{"x":c[0], "y":c[1]} for c in coords]
def get_with_interpolated_history(self) -> Track:
# new_history = [Detection(d.track_id, l, t, w, h, d.conf, d.state, d.frame_nr, d.det_class) for l, t, w, h, d in zip(ls,ts,ws,hs, track.history)]
# new_track = Track(track.track_id, new_history, track.predictor_history, track.predictions)
new_history = []
for j in range(len(self.history)):
a = self.history[j]
new_history.append(Detection(a.track_id, a.l, a.t, a.w, a.h, a.conf, a.state, a.frame_nr, a.det_class))
if j+1 >= len(self.history):
break
b = self.history[j+1]
gap = b.frame_nr - a.frame_nr
if gap < 1:
logger.error(f"WARNING, gap between frames {a.frame_nr} -> {b.frame_nr} is negative?")
if gap > 1:
for g in range(1, gap):
l = lerp(a.l, b.l, g/gap)
t = lerp(a.t, b.t, g/gap)
w = lerp(a.w, b.w, g/gap)
h = lerp(a.h, b.h, g/gap)
conf = 0
state = DetectionState.Lost
frame_nr = a.frame_nr + g
new_history.append(Detection(a.track_id, l, t, w, h, conf, state, frame_nr, a.det_class))
return Track(
self.track_id,
new_history,
self.predictor_history,
self.predictions,
self.fps)
def is_complete(self):
diffs = [(b.frame_nr - a.frame_nr) for a,b in zip(self.history[:-1], self.history[1:])]
return any([d != 1 for d in diffs])
def get_sampled(self, step_size = 1, offset=0):
if not self.is_complete():
t = self.get_with_interpolated_history()
else:
t = self
return Track(
t.track_id,
t.history[offset::step_size],
t.predictor_history,
t.predictions,
t.fps/step_size)
def get_binned(self, bin_size, camera: Camera, bin_start=True):
"""
For an experiment: what if we predict using only concrete positions, by mapping
dx,dy to a grid. Thus prediction can be for 8 moves, or rather headings
see ~/notes/attachments example svg
"""
history = self.get_projected_history_as_dict(H=None, camera=camera)
def round_to_grid_precision(x):
factor = 1/bin_size
return round(x * factor) / factor
new_history: List[dict] = []
for i, (det0, det1) in enumerate(zip(history[:-1], history[1:])):
if i == 0:
new_history.append({
'x': round_to_grid_precision(det0['x']),
'y': round_to_grid_precision(det0['y'])
} if bin_start else det0)
continue
if abs(det1['x'] - new_history[-1]['x']) < bin_size and abs(det1['y'] - new_history[-1]['y']) < bin_size:
continue
# det1 falls outside of the box [-bin_size:+bin_size] around last detection
# 1. Interpolate exact point between det0 and det1 that this happens
if abs(det1['x'] - new_history[-1]['x']) >= bin_size:
if det1['x'] - new_history[-1]['x'] >= bin_size:
# det1 left of last
x = new_history[-1]['x'] + bin_size
f = inv_lerp(det0['x'], det1['x'], x)
elif new_history[-1]['x'] - det1['x'] >= bin_size:
# det1 left of last
x = new_history[-1]['x'] - bin_size
f = inv_lerp(det0['x'], det1['x'], x)
y = lerp(det0['y'], det1['y'], f)
if abs(det1['y'] - new_history[-1]['y']) >= bin_size:
if det1['y'] - new_history[-1]['y'] >= bin_size:
# det1 left of last
y = new_history[-1]['y'] + bin_size
f = inv_lerp(det0['y'], det1['y'], y)
elif new_history[-1]['y'] - det1['y'] >= bin_size:
# det1 left of last
y = new_history[-1]['y'] - bin_size
f = inv_lerp(det0['y'], det1['y'], y)
x = lerp(det0['x'], det1['x'], f)
# 2. Find closest point on rectangle (rectangle's four corners, or 4 midpoints)
points = get_bins(bin_size)
points = [[new_history[-1]['x']+p[0], new_history[-1]['y'] + p[1]] for p in points]
distances = [np.linalg.norm([p[0] - x, p[1]-y]) for p in points]
closest = np.argmin(distances)
point = points[closest]
new_history.append({'x': point[0], 'y':point[1]})
# todo Offsets to points:[ history for in points]
return new_history
def to_trajectron_node(self, camera: Camera, env: Environment) -> Node:
positions = self.get_projected_history(None, camera)
velocity = np.gradient(positions, 1/self.fps, axis=0)
acceleration = np.gradient(velocity, 1/self.fps, axis=0)
new_first_idx = self.history[0].frame_nr
data_columns = pd.MultiIndex.from_product([['position', 'velocity', 'acceleration'], ['x', 'y']])
# vx = derivative_of(x, scene.dt)
# vy = derivative_of(y, scene.dt)
# ax = derivative_of(vx, scene.dt)
# ay = derivative_of(vy, scene.dt)
data_dict = {
('position', 'x'): positions[:,0],
('position', 'y'): positions[:,1],
('velocity', 'x'): velocity[:,0],
('velocity', 'y'): velocity[:,1],
('acceleration', 'x'): acceleration[:,0],
('acceleration', 'y'): acceleration[:,1]
}
node_data = pd.DataFrame(data_dict, columns=data_columns)
return Node(node_type=env.NodeType.PEDESTRIAN, node_id=self.track_id, data=node_data, first_timestep=new_first_idx)
@dataclass
class Frame:
index: int
img: np.array
time: float= field(default_factory=lambda: time.time())
tracks: Optional[dict[str, Track]] = None
H: Optional[np.array] = None
camera: Optional[Camera] = None
maps: Optional[List[cv2.Mat]] = None
def aslist(self) -> [dict]:
return { t.track_id:
{
'id': t.track_id,
'history': t.get_projected_history(self.H).tolist(),
'det_conf': t.history[-1].conf,
# 'det_conf': trajectory_data[node.id]['det_conf'],
# 'bbox': trajectory_data[node.id]['bbox'],
# 'history': history.tolist(),
'predictions': t.predictions
} for t in self.tracks.values()
}
def without_img(self):
return Frame(self.index, None, self.time, self.tracks, self.H, self.camera, self.maps)
def video_src_from_config(config) -> UrlOrPath:
if config.video_loop:
video_srcs: Iterable[UrlOrPath] = cycle(config.video_src)
else:
video_srcs: Iterable[UrlOrPath] = config.video_src
return video_srcs
class FrameEmitter:
'''
@ -458,92 +88,19 @@ class FrameEmitter:
def emit_video(self, timer_counter):
i = 0
delay_generation = False
for video_path in self.video_srcs:
logger.info(f"Play from '{str(video_path)}'")
if str(video_path).isdigit():
# numeric input is a CV camera
video = cv2.VideoCapture(int(str(video_path)))
# TODO: make config variables
video.set(cv2.CAP_PROP_FRAME_WIDTH, int(self.config.camera.w))
video.set(cv2.CAP_PROP_FRAME_HEIGHT, int(self.config.camera.h))
print("exposure!", video.get(cv2.CAP_PROP_AUTO_EXPOSURE))
video.set(cv2.CAP_PROP_FPS, 5)
fps=5
elif video_path.url.scheme == 'rtsp':
gst = f"rtspsrc location={video_path} latency=0 buffer-mode=auto ! decodebin ! videoconvert ! appsink max-buffers=0 drop=true"
logger.info(f"Capture gstreamer (gst-launch-1.0): {gst}")
video = cv2.VideoCapture(gst, cv2.CAP_GSTREAMER)
fps=12
else:
# os.environ["OPENCV_FFMPEG_CAPTURE_OPTIONS"] = "fflags;nobuffer|flags;low_delay|avioflags;direct|rtsp_transport;udp"
video = cv2.VideoCapture(str(video_path))
delay_generation = True
fps = video.get(cv2.CAP_PROP_FPS)
target_frame_duration = 1./fps
logger.info(f"Emit frames at {fps} fps")
if self.config.video_offset:
logger.info(f"Start at frame {self.config.video_offset}")
video.set(cv2.CAP_PROP_POS_FRAMES, self.config.video_offset)
i = self.config.video_offset
source = get_video_source(self.video_srcs, self.config.camera)
for i, img in enumerate(source):
with timer_counter.get_lock():
timer_counter.value += 1
# if '-' in video_path.path().stem:
# path_stem = video_path.stem[:video_path.stem.rfind('-')]
# else:
# path_stem = video_path.stem
# path_stem += "-homography"
# homography_path = video_path.with_stem(path_stem).with_suffix('.txt')
# logger.info(f'check homography file {homography_path}')
# if homography_path.exists():
# logger.info(f'Found custom homography file! Using {homography_path}')
# video_H = np.loadtxt(homography_path, delimiter=',')
# else:
# video_H = None
video_H = self.config.camera.H
frame = Frame(i, img=img, H=self.config.camera.H, camera=self.config.camera)
prev_time = time.time()
while self.is_running.is_set():
with timer_counter.get_lock():
timer_counter.value += 1
ret, img = video.read()
# seek to 0 if video has finished. Infinite loop
if not ret:
# now loading multiple files
break
# video.set(cv2.CAP_PROP_POS_FRAMES, 0)
# ret, img = video.read()
# assert ret is not False # not really error proof...
if "DATASETS/hof/" in str(video_path):
# hack to mask out area
cv2.rectangle(img, (0,0), (800,200), (0,0,0), -1)
frame = Frame(index=i, img=img, H=self.config.H, camera=self.config.camera)
# TODO: this is very dirty, need to find another way.
# perhaps multiprocessing Array?
self.frame_noimg_sock.send(pickle.dumps(frame.without_img()))
self.frame_sock.send(pickle.dumps(frame))
# only delay consuming the next frame when using a file.
# Otherwise, go ASAP
if delay_generation:
# defer next loop
now = time.time()
time_diff = (now - prev_time)
if time_diff < target_frame_duration:
time.sleep(target_frame_duration - time_diff)
now += target_frame_duration - time_diff
prev_time = now
i += 1
# TODO: this is very dirty, need to find another way.
# perhaps multiprocessing Array?
self.frame_noimg_sock.send(pickle.dumps(frame.without_img()))
self.frame_sock.send(pickle.dumps(frame))
if not self.is_running.is_set():
# if not running, also break out of infinite generator loop

170
trap/video_sources.py Normal file
View file

@ -0,0 +1,170 @@
import logging
import math
from pathlib import Path
import time
from typing import Any, Generator, Iterable, List, Optional
import neoapi
import cv2
import numpy as np
from trap.frame_emitter import Camera, Frame, UrlOrPath
logger = logging.getLogger('video_source')
class VideoSource:
"""Video Frame generator
"""
def recv(self) -> Generator[Optional[cv2.typing.MatLike], Any, None]:
raise RuntimeError("Not implemented")
def __iter__(self):
for i in self.recv():
yield i
class GigE(VideoSource):
def __init__(self, identifier=None):
self.camera = neoapi.Cam()
# self.camera.Connect('-B127')
self.camera.Connect(identifier)
# Default buffer mode, streaming, always returns latest frame
self.camera.SetImageBufferCount(10)
# neoAPI docs: Setting the neoapi.Cam.SetImageBufferCycleCount()to one ensures that all buffers but one are given back to the neoAPI to be re-cycled and never given to the user by the neoapi.Cam.GetImage() method.
self.camera.SetImageBufferCycleCount(1)
# if self.camera.IsConnected():
# self.camera.f.PixelFormat.Set(neoapi.PixelFormat_RGB8)
# self.camera.f.BinningHorizontal.Set(2)
# self.camera.f.BinningVertical.Set(2)
self.pixfmt = self.camera.f.PixelFormat.Get()
def recv(self):
while True:
if not self.camera.IsConnected():
return
i = self.camera.GetImage(0)
if i.IsEmpty():
time.sleep(.01)
continue
imgarray = i.GetNPArray()
if self.pixfmt == neoapi.PixelFormat_BayerRG12:
img = cv2.cvtColor(imgarray, cv2.COLOR_BayerRG2RGB)
elif self.pixfmt == neoapi.PixelFormat_BayerRG8:
img = cv2.cvtColor(imgarray, cv2.COLOR_BayerRG2RGB)
else:
img = cv2.cvtColor(imgarray, cv2.COLOR_BGR2RGB)
if img.dtype == np.uint16:
img = cv2.convertScaleAbs(img, alpha=(255.0/65535.0))
yield img
class SingleCvVideoSource(VideoSource):
def recv(self):
while True:
ret, img = self.video.read()
self.frame_idx+=1
# seek to 0 if video has finished. Infinite loop
if not ret:
# now loading multiple files
break
# frame = Frame(index=self.n, img=img, H=self.camera.H, camera=self.camera)
yield img
class RtspSource(SingleCvVideoSource):
def __init__(self, video_url: str | Path, camera: Camera = None):
gst = f"rtspsrc location={video_url} latency=0 buffer-mode=auto ! decodebin ! videoconvert ! appsink max-buffers=0 drop=true"
logger.info(f"Capture gstreamer (gst-launch-1.0): {gst}")
self.video = cv2.VideoCapture(gst, cv2.CAP_GSTREAMER)
self.frame_idx = 0
class FilelistSource(SingleCvVideoSource):
def __init__(self, video_sources: Iterable[UrlOrPath], camera: Camera = None, delay = True):
# store current position
self.video_sources = video_sources
self.camera = camera
self.video_path = None
self.video_nr = None
self.frame_count = None
self.frame_idx = None
self.n = 0
self.delay_generation = delay
def recv(self):
prev_time = time.time()
for video_nr, video_path in enumerate(self.video_sources):
self.video_path = video_path
self.video_nr = video_nr
logger.info(f"Play from '{str(video_path)}'")
video = cv2.VideoCapture(str(video_path))
fps = video.get(cv2.CAP_PROP_FPS)
target_frame_duration = 1./fps
self.frame_count = video.get(cv2.CAP_PROP_FRAME_COUNT)
if self.frame_count < 0:
self.frame_count = math.inf
self.frame_idx = 0
# TODO)) Video offset
# if self.config.video_offset:
# logger.info(f"Start at frame {self.config.video_offset}")
# video.set(cv2.CAP_PROP_POS_FRAMES, self.config.video_offset)
# self.frame_idx = self.config.video_offset
while True:
ret, img = video.read()
self.frame_idx+=1
self.n+=1
# seek to 0 if video has finished. Infinite loop
if not ret:
# now loading multiple files
break
if "DATASETS/hof/" in str(video_path):
# hack to mask out area
cv2.rectangle(img, (0,0), (800,200), (0,0,0), -1)
# frame = Frame(index=self.n, img=img, H=self.camera.H, camera=self.camera)
yield img
if self.delay_generation:
# defer next loop
now = time.time()
time_diff = (now - prev_time)
if time_diff < target_frame_duration:
time.sleep(target_frame_duration - time_diff)
now += target_frame_duration - time_diff
prev_time = now
class CameraSource(SingleCvVideoSource):
def __init__(self, identifier: int, camera: Camera):
self.video = cv2.VideoCapture(identifier)
self.camera = camera
# TODO: make config variables
self.video.set(cv2.CAP_PROP_FRAME_WIDTH, int(self.camera.w))
self.video.set(cv2.CAP_PROP_FRAME_HEIGHT, int(self.camera.h))
# print("exposure!", video.get(cv2.CAP_PROP_AUTO_EXPOSURE))
self.video.set(cv2.CAP_PROP_FPS, self.camera.fps)
self.frame_idx = 0
def get_video_source(video_sources: List[UrlOrPath], camera: Camera):
if str(video_sources[0]).isdigit():
# numeric input is a CV camera
return CameraSource(int(str(video_sources[0])), camera)
elif video_sources[0].url.scheme == 'rtsp':
video_sources[0].url.hostname
return RtspSource(video_sources[0])
elif video_sources[0].url.scheme == 'gige':
return GigE(video_sources[0].url.hostname)
else:
return FilelistSource(video_sources)
# os.environ["OPENCV_FFMPEG_CAPTURE_OPTIONS"] = "fflags;nobuffer|flags;low_delay|avioflags;direct|rtsp_transport;udp"