from __future__ import annotations from argparse import Namespace from dataclasses import dataclass, field import dataclasses from enum import IntFlag from itertools import cycle import json import logging from multiprocessing import Event from pathlib import Path import pickle import sys import time from typing import Iterable, List, Optional import numpy as np import cv2 import pandas as pd import zmq import os from deep_sort_realtime.deep_sort.track import Track as DeepsortTrack from deep_sort_realtime.deep_sort.track import TrackState as DeepsortTrackState from bytetracker.byte_tracker import STrack as ByteTrackTrack from bytetracker.basetrack import TrackState as ByteTrackTrackState from trajectron.environment import Environment, Node, Scene from urllib.parse import urlparse from trap.utils import lerp logger = logging.getLogger('trap.frame_emitter') class DataclassJSONEncoder(json.JSONEncoder): def default(self, o): if isinstance(o, np.ndarray): return o.tolist() if dataclasses.is_dataclass(o): d = dataclasses.asdict(o) if isinstance(o, Frame): # Don't send images over JSON del d['img'] return d return super().default(o) class UrlOrPath(): def __init__(self, string): self.url = urlparse(str(string)) def __str__(self) -> str: return self.url.geturl() def is_url(self) -> bool: return len(self.url.netloc) > 0 def path(self) -> Path: if self.is_url(): return Path(self.url.path) return Path(self.url.geturl()) # can include scheme, such as C:/ class Space(IntFlag): Image = 1 # As detected in the image Undistorted = 2 # After applying lense undistortiion World = 4 # After lens undistort and homography Render = 8 # View space of renderer class DetectionState(IntFlag): Tentative = 1 # state before n_init (see DeepsortTrack) Confirmed = 2 # after tentative Lost = 4 # lost when DeepsortTrack.time_since_update > 0 but not Deleted Interpolated = 8 # A position estimated through interpolation of adjecent detections @classmethod def from_deepsort_track(cls, track: DeepsortTrack): if track.state == DeepsortTrackState.Tentative: return cls.Tentative if track.state == DeepsortTrackState.Confirmed: if track.time_since_update > 0: return cls.Lost return cls.Confirmed raise RuntimeError("Should not run into Deleted entries here") @classmethod def from_bytetrack_track(cls, track: ByteTrackTrack): if track.state == ByteTrackTrackState.New: return cls.Tentative if track.state == ByteTrackTrackState.Lost: return cls.Lost # if track.time_since_update > 0: if track.state == ByteTrackTrackState.Tracked: return cls.Confirmed raise RuntimeError("Should not run into Deleted entries here") def H_from_path(path: Path): if path.suffix == '.json': with path.open('r') as fp: H = np.array(json.load(fp)) else: H = np.loadtxt(path, delimiter=',') return H @dataclass class Camera: mtx: cv2.Mat dist: cv2.Mat w: float h: float H: cv2.Mat # homography newcameramtx: cv2.Mat = field(init=False) roi: cv2.typing.Rect = field(init=False) fps: float def __post_init__(self): self.newcameramtx, self.roi = cv2.getOptimalNewCameraMatrix(self.mtx, self.dist, (self.w,self.h), 1, (self.w,self.h)) @classmethod def from_calibfile(cls, calibration_path, H, fps): with calibration_path.open('r') as fp: data = json.load(fp) # print(data) # print(data['camera_matrix']) # camera = { # 'camera_matrix': np.array(data['camera_matrix']), # 'dist_coeff': np.array(data['dist_coeff']), # } return cls( np.array(data['camera_matrix']), np.array(data['dist_coeff']), data['dim']['width'], data['dim']['height'], H, fps) @classmethod def from_paths(cls, calibration_path, h_path, fps): H = H_from_path(h_path) return cls.from_calibfile(calibration_path, H, fps) # def __init__(self, mtx, dist, w, h, H): # self.mtx = mtx # self.dist = dist # self.w = w # self.h = h # self.newcameramtx, self.roi = cv2.getOptimalNewCameraMatrix(mtx, dist, (w,h), 1, (w,h)) # self.H = H # homography @dataclass class Position: x: float y: float conf: float state: DetectionState frame_nr: int det_class: str @dataclass class Detection: track_id: str # deepsort track id association l: int # left - image space t: int # top - image space w: int # width - image space h: int # height - image space conf: float # object detector probablity state: DetectionState frame_nr: int det_class: str def get_foot_coords(self) -> list[float, float]: return [self.l + 0.5 * self.w, self.t+self.h] @classmethod def from_deepsort(cls, dstrack: DeepsortTrack, frame_nr: int): return cls(dstrack.track_id, *dstrack.to_ltwh(), dstrack.det_conf, DetectionState.from_deepsort_track(dstrack), frame_nr, dstrack.det_class) @classmethod def from_bytetrack(cls, bstrack: ByteTrackTrack, frame_nr: int): return cls(bstrack.track_id, *bstrack.tlwh, bstrack.score, DetectionState.from_bytetrack_track(bstrack), frame_nr, bstrack.cls) def get_scaled(self, scale: float = 1): if scale == 1: return self return Detection( self.track_id, self.l*scale, self.t*scale, self.w*scale, self.h*scale, self.conf, self.state, self.frame_nr, self.det_class) def to_ltwh(self): return (int(self.l), int(self.t), int(self.w), int(self.h)) def to_ltrb(self): return (int(self.l), int(self.t), int(self.l+self.w), int(self.t+self.h)) @dataclass class Trajectory: # TODO)) Replace history and predictions in Track with Trajectory space: Space fps: int = 12 points: List[Detection] = field(default_factory=list) def __iter__(self): for d in self.points: yield d @dataclass class Track: """A bit of an haphazardous wrapper around the 'real' tracker to provide a history, with which the predictor can work, as we then can deduce velocity and acceleration. """ track_id: str = None history: List[Detection] = field(default_factory=list) predictor_history: Optional[list] = None # in image space predictions: Optional[list] = None fps: int = 12 source: Optional[int] = None # to keep track of processed tracks def get_projected_history(self, H: Optional[cv2.Mat] = None, camera: Optional[Camera]= None) -> np.array: foot_coordinates = [d.get_foot_coords() for d in self.history] # TODO)) Undistort points before perspective transform if len(foot_coordinates): if camera: coords = cv2.undistortPoints(np.array([foot_coordinates]).astype('float32'), camera.mtx, camera.dist, None, camera.newcameramtx) coords = cv2.perspectiveTransform(np.array(coords),camera.H) return coords.reshape((coords.shape[0],2)) else: coords = cv2.perspectiveTransform(np.array([foot_coordinates]),H) return coords[0] return np.array([]) def get_projected_history_as_dict(self, H, camera: Optional[Camera]= None) -> dict: coords = self.get_projected_history(H, camera) return [{"x":c[0], "y":c[1]} for c in coords] def get_with_interpolated_history(self) -> Track: # new_history = [Detection(d.track_id, l, t, w, h, d.conf, d.state, d.frame_nr, d.det_class) for l, t, w, h, d in zip(ls,ts,ws,hs, track.history)] # new_track = Track(track.track_id, new_history, track.predictor_history, track.predictions) new_history = [] for j in range(len(self.history)): a = self.history[j] new_history.append(Detection(a.track_id, a.l, a.t, a.w, a.h, a.conf, a.state, a.frame_nr, a.det_class)) if j+1 >= len(self.history): break b = self.history[j+1] gap = b.frame_nr - a.frame_nr if gap < 1: logger.error(f"WARNING, gap between frames {a.frame_nr} -> {b.frame_nr} is negative?") if gap > 1: for g in range(1, gap): l = lerp(a.l, b.l, g/gap) t = lerp(a.t, b.t, g/gap) w = lerp(a.w, b.w, g/gap) h = lerp(a.h, b.h, g/gap) conf = 0 state = DetectionState.Lost frame_nr = a.frame_nr + g new_history.append(Detection(a.track_id, l, t, w, h, conf, state, frame_nr, a.det_class)) return Track( self.track_id, new_history, self.predictor_history, self.predictions, self.fps) def is_complete(self): diffs = [(b.frame_nr - a.frame_nr) for a,b in zip(self.history[:-1], self.history[1:])] return any([d != 1 for d in diffs]) def get_sampled(self, step_size = 1, offset=0): if not self.is_complete(): t = self.get_with_interpolated_history() else: t = self return Track( t.track_id, t.history[offset::step_size], t.predictor_history, t.predictions, t.fps/step_size) # def get_binned(self, bin_size=.5, remove_overlap=True): # """ # For an experiment: what if we predict using only concrete positions, by mapping # dx,dy to a grid. Thus prediction can be for 8 moves, or rather headings # see ~/notes/attachments example svg # """ # new_history: List[Detection] = [] # for i, (det0, det1) in enumerate(zip(self.history[:-1], self.history[1:]): # if i == 0: # new_history.append(det0) # continue # if abs(det1.x - new_history[-1].x) < bin_size or abs(det1.y - new_history[-1].y) < bin_size: # continue # # det1 falls outside of the box [-bin_size:+bin_size] around last detection # # 1. Interpolate exact point between det0 and det1 that this happens # if abs(det1.x - new_history[-1].x) >= bin_size: # if det1.x - new_history[-1].x >= bin_size: # # det1 left of last # x = new_history[-1].x + bin_size # f = inv_lerp(det0.x, det1.x, x) # elif new_history[-1].x - det1.x >= bin_size: # # det1 left of last # x = new_history[-1].x - bin_size # f = inv_lerp(det0.x, det1.x, x) # y = lerp(det0.y, det1.y, f) # if abs(det1.y - new_history[-1].y) >= bin_size: # if det1.y - new_history[-1].y >= bin_size: # # det1 left of last # y = new_history[-1].y + bin_size # f = inv_lerp(det0.y, det1.y, x) # elif new_history[-1].y - det1.y >= bin_size: # # det1 left of last # y = new_history[-1].y - bin_size # f = inv_lerp(det0.y, det1.y, x) # x = lerp(det0.x, det1.x, f) # # 2. Find closest point on rectangle (rectangle's four corners, or 4 midpoints) # points = [[bin_size, 0], [bin_size, bin_size], [0, bin_size], [-bin_size, bin_size], [-bin_size, 0], [-bin_size, -bin_size], [0, -bin_size], [bin_size, -bin_size]] # # todo Offsets to points:[ history for in points] def to_trajectron_node(self, camera: Camera, env: Environment) -> Node: positions = self.get_projected_history(None, camera) velocity = np.gradient(positions, 1/self.fps, axis=0) acceleration = np.gradient(velocity, 1/self.fps, axis=0) new_first_idx = self.history[0].frame_nr data_columns = pd.MultiIndex.from_product([['position', 'velocity', 'acceleration'], ['x', 'y']]) # vx = derivative_of(x, scene.dt) # vy = derivative_of(y, scene.dt) # ax = derivative_of(vx, scene.dt) # ay = derivative_of(vy, scene.dt) data_dict = { ('position', 'x'): positions[:,0], ('position', 'y'): positions[:,1], ('velocity', 'x'): velocity[:,0], ('velocity', 'y'): velocity[:,1], ('acceleration', 'x'): acceleration[:,0], ('acceleration', 'y'): acceleration[:,1] } node_data = pd.DataFrame(data_dict, columns=data_columns) return Node(node_type=env.NodeType.PEDESTRIAN, node_id=self.track_id, data=node_data, first_timestep=new_first_idx) @dataclass class Frame: index: int img: np.array time: float= field(default_factory=lambda: time.time()) tracks: Optional[dict[str, Track]] = None H: Optional[np.array] = None camera: Optional[Camera] = None def aslist(self) -> [dict]: return { t.track_id: { 'id': t.track_id, 'history': t.get_projected_history(self.H).tolist(), 'det_conf': t.history[-1].conf, # 'det_conf': trajectory_data[node.id]['det_conf'], # 'bbox': trajectory_data[node.id]['bbox'], # 'history': history.tolist(), 'predictions': t.predictions } for t in self.tracks.values() } def video_src_from_config(config) -> UrlOrPath: if config.video_loop: video_srcs: Iterable[UrlOrPath] = cycle(config.video_src) else: video_srcs: Iterable[UrlOrPath] = config.video_src return video_srcs class FrameEmitter: ''' Emit frame in a separate threat so they can be throttled, or thrown away when the rest of the system cannot keep up ''' def __init__(self, config: Namespace, is_running: Event) -> None: self.config = config self.is_running = is_running context = zmq.Context() # TODO: to make things faster, a multiprocessing.Array might be a tad faster: https://stackoverflow.com/a/65201859 self.frame_sock = context.socket(zmq.PUB) self.frame_sock.setsockopt(zmq.CONFLATE, 1) # only keep latest frame. make sure to set BEFORE connect/bind self.frame_sock.bind(config.zmq_frame_addr) logger.info(f"Connection socket {config.zmq_frame_addr}") self.video_srcs = video_src_from_config(self.config) def emit_video(self): i = 0 delay_generation = False for video_path in self.video_srcs: logger.info(f"Play from '{str(video_path)}'") if str(video_path).isdigit(): # numeric input is a CV camera video = cv2.VideoCapture(int(str(video_path))) # TODO: make config variables video.set(cv2.CAP_PROP_FRAME_WIDTH, int(self.config.camera.w)) video.set(cv2.CAP_PROP_FRAME_HEIGHT, int(self.config.camera.h)) print("exposure!", video.get(cv2.CAP_PROP_AUTO_EXPOSURE)) video.set(cv2.CAP_PROP_FPS, 5) fps=5 elif video_path.url.scheme == 'rtsp': gst = f"rtspsrc location={video_path} latency=0 buffer-mode=auto ! decodebin ! videoconvert ! appsink max-buffers=1 drop=true" logger.info(f"Capture gstreamer (gst-launch-1.0): {gst}") video = cv2.VideoCapture(gst, cv2.CAP_GSTREAMER) fps=12 else: # os.environ["OPENCV_FFMPEG_CAPTURE_OPTIONS"] = "fflags;nobuffer|flags;low_delay|avioflags;direct|rtsp_transport;udp" video = cv2.VideoCapture(str(video_path)) delay_generation = True fps = video.get(cv2.CAP_PROP_FPS) target_frame_duration = 1./fps logger.info(f"Emit frames at {fps} fps") if self.config.video_offset: logger.info(f"Start at frame {self.config.video_offset}") video.set(cv2.CAP_PROP_POS_FRAMES, self.config.video_offset) i = self.config.video_offset # if '-' in video_path.path().stem: # path_stem = video_path.stem[:video_path.stem.rfind('-')] # else: # path_stem = video_path.stem # path_stem += "-homography" # homography_path = video_path.with_stem(path_stem).with_suffix('.txt') # logger.info(f'check homography file {homography_path}') # if homography_path.exists(): # logger.info(f'Found custom homography file! Using {homography_path}') # video_H = np.loadtxt(homography_path, delimiter=',') # else: # video_H = None video_H = self.config.camera.H prev_time = time.time() while self.is_running.is_set(): ret, img = video.read() # seek to 0 if video has finished. Infinite loop if not ret: # now loading multiple files break # video.set(cv2.CAP_PROP_POS_FRAMES, 0) # ret, img = video.read() # assert ret is not False # not really error proof... if "DATASETS/hof/" in str(video_path): # hack to mask out area cv2.rectangle(img, (0,0), (800,200), (0,0,0), -1) frame = Frame(index=i, img=img, H=self.config.H, camera=self.config.camera) # TODO: this is very dirty, need to find another way. # perhaps multiprocessing Array? self.frame_sock.send(pickle.dumps(frame)) # only delay consuming the next frame when using a file. # Otherwise, go ASAP if delay_generation: # defer next loop now = time.time() time_diff = (now - prev_time) if time_diff < target_frame_duration: time.sleep(target_frame_duration - time_diff) now += target_frame_duration - time_diff prev_time = now i += 1 if not self.is_running.is_set(): # if not running, also break out of infinite generator loop break logger.info("Stopping") def run_frame_emitter(config: Namespace, is_running: Event): router = FrameEmitter(config, is_running) router.emit_video() is_running.clear()