from argparse import Namespace from collections import defaultdict import csv from dataclasses import dataclass, field import json import logging from multiprocessing import Event from pathlib import Path import pickle import time from typing import Optional import numpy as np import torch import zmq import cv2 from torchvision.models.detection import retinanet_resnet50_fpn_v2, RetinaNet_ResNet50_FPN_V2_Weights, keypointrcnn_resnet50_fpn, KeypointRCNN_ResNet50_FPN_Weights, maskrcnn_resnet50_fpn_v2, MaskRCNN_ResNet50_FPN_V2_Weights from deep_sort_realtime.deepsort_tracker import DeepSort from torchvision.models import ResNet50_Weights from deep_sort_realtime.deep_sort.track import Track as DeepsortTrack from ultralytics import YOLO from ultralytics.engine.results import Results as YOLOResult from trap.frame_emitter import DetectionState, Frame, Detection, Track from tsmoothie.smoother import KalmanSmoother, ConvolutionSmoother import tsmoothie.smoother # Detection = [int, int, int, int, float, int] # Detections = [Detection] # This is the dt that is also used by the scene. # as this needs to be rather stable, try to adhere # to it by waiting when we are faster. Value chosen based # on a rough estimate of tracker duration TARGET_DT = .1 logger = logging.getLogger("trap.tracker") DETECTOR_RETINANET = 'retinanet' DETECTOR_MASKRCNN = 'maskrcnn' DETECTOR_FASTERRCNN = 'fasterrcnn' DETECTOR_YOLOv8 = 'ultralytics' DETECTORS = [DETECTOR_RETINANET, DETECTOR_MASKRCNN, DETECTOR_FASTERRCNN, DETECTOR_YOLOv8] class Tracker: def __init__(self, config: Namespace, is_running: Event): self.config = config self.is_running = is_running context = zmq.Context() self.frame_sock = context.socket(zmq.SUB) self.frame_sock.setsockopt(zmq.CONFLATE, 1) # only keep latest frame. NB. make sure this comes BEFORE connect, otherwise it's ignored!! self.frame_sock.setsockopt(zmq.SUBSCRIBE, b'') self.frame_sock.connect(config.zmq_frame_addr) self.trajectory_socket = context.socket(zmq.PUB) self.trajectory_socket.setsockopt(zmq.CONFLATE, 1) # only keep latest frame self.trajectory_socket.bind(config.zmq_trajectory_addr) # # TODO: config device self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # TODO: support removal self.tracks = defaultdict(lambda: Track()) logger.debug(f"Load tracker: {self.config.detector}") if self.config.detector == DETECTOR_RETINANET: # weights = RetinaNet_ResNet50_FPN_V2_Weights.DEFAULT # self.model = retinanet_resnet50_fpn_v2(weights=weights, score_thresh=0.2) weights = KeypointRCNN_ResNet50_FPN_Weights.DEFAULT self.model = keypointrcnn_resnet50_fpn(weights=weights, box_score_thresh=0.35) self.model.to(self.device) # Put the model in inference mode self.model.eval() # Get the transforms for the model's weights self.preprocess = weights.transforms().to(self.device) self.mot_tracker = DeepSort(max_iou_distance=1, max_cosine_distance=0.5, max_age=15, nms_max_overlap=0.9, # embedder='torchreid', embedder_wts="../MODELS/osnet_x1_0_imagenet.pth" ) elif self.config.detector == DETECTOR_MASKRCNN: weights = MaskRCNN_ResNet50_FPN_V2_Weights.COCO_V1 self.model = maskrcnn_resnet50_fpn_v2(weights=weights, box_score_thresh=0.7) self.model.to(self.device) # Put the model in inference mode self.model.eval() # Get the transforms for the model's weights self.preprocess = weights.transforms().to(self.device) self.mot_tracker = DeepSort(n_init=5, max_iou_distance=1, max_cosine_distance=0.5, max_age=15, nms_max_overlap=0.9, # embedder='torchreid', embedder_wts="../MODELS/osnet_x1_0_imagenet.pth" ) elif self.config.detector == DETECTOR_YOLOv8: self.model = YOLO('EXPERIMENTS/yolov8x.pt') else: raise RuntimeError(f"{self.config.detector} is not implemented yet. See --help") # homography = list(source.glob('*img2world.txt'))[0] self.H = self.config.H if self.config.smooth_tracks: logger.info("Smoother enabled") self.smoother = Smoother() else: logger.info("Smoother Disabled (enable with --smooth-tracks)") logger.debug("Set up tracker") def track(self): prev_run_time = 0 training_fp = None training_csv = None training_frames = 0 if self.config.save_for_training is not None: if not isinstance(self.config.save_for_training, Path): raise ValueError("save-for-training should be a path") if not self.config.save_for_training.exists(): logger.info(f"Making path for training data: {self.config.save_for_training}") self.config.save_for_training.mkdir(parents=True, exist_ok=False) else: logger.warning(f"Path for training-data exists: {self.config.save_for_training}. Continuing assuming that's ok.") training_fp = open(self.config.save_for_training / 'all.txt', 'w') # following https://github.com/StanfordASL/Trajectron-plus-plus/blob/master/experiments/pedestrians/process_data.py training_csv = csv.DictWriter(training_fp, fieldnames=['frame_id', 'track_id', 'l', 't', 'w', 'h', 'x', 'y', 'state'], delimiter='\t', quoting=csv.QUOTE_NONE) prev_frame_i = -1 while self.is_running.is_set(): # this waiting for target_dt causes frame loss. E.g. with target_dt at .1, it # skips exactly 1 frame on a 10 fps video (which, it obviously should not do) # so for now, timing should move to emitter # this_run_time = time.time() # # logger.debug(f'test {prev_run_time - this_run_time}') # time.sleep(max(0, prev_run_time - this_run_time + TARGET_DT)) # prev_run_time = time.time() zmq_ev = self.frame_sock.poll(timeout=2000) if not zmq_ev: logger.warn('skip poll after 2000ms') # when there's no data after timeout, loop so that is_running is checked continue start_time = time.time() frame: Frame = self.frame_sock.recv_pyobj() # frame delivery in current setup: 0.012-0.03s if frame.index > (prev_frame_i+1): logger.warn(f"Dropped {frame.index - prev_frame_i - 1} frames ({frame.index=}, {prev_frame_i=})") prev_frame_i = frame.index # load homography into frame (TODO: should this be done in emitter?) if frame.H is None: # logger.warning('Falling back to default H') # fallback: load configured H frame.H = self.H # logger.info(f"Frame delivery delay = {time.time()-frame.time}s") if self.config.detector == DETECTOR_YOLOv8: detections: [Detection] = self._yolov8_track(frame) else : detections: [Detection] = self._resnet_track(frame.img, scale = 1) # Store detections into tracklets projected_coordinates = [] for detection in detections: track = self.tracks[detection.track_id] track.track_id = detection.track_id # for new tracks track.history.append(detection) # add to history # projected_coordinates.append(track.get_projected_history(self.H)) # then get full history # TODO: hadle occlusions, and dissappearance # if len(track.history) > 30: # retain 90 tracks for 90 frames # track.history.pop(0) # trajectories = {} # for detection in detections: # tid = str(detection.track_id) # track = self.tracks[detection.track_id] # coords = track.get_projected_history(self.H) # get full history # trajectories[tid] = { # "id": tid, # "det_conf": detection.conf, # "bbox": detection.to_ltwh(), # "history": [{"x":c[0], "y":c[1]} for c in coords[0]] if not self.config.bypass_prediction else coords[0].tolist() # already doubles nested, fine for test # } active_track_ids = [d.track_id for d in detections] active_tracks = {t.track_id: t for t in self.tracks.values() if t.track_id in active_track_ids} # logger.info(f"{trajectories}") frame.tracks = active_tracks # if self.config.bypass_prediction: # self.trajectory_socket.send_string(json.dumps(trajectories)) # else: # self.trajectory_socket.send(pickle.dumps(frame)) if self.config.smooth_tracks: frame = self.smoother.smooth_frame_tracks(frame) self.trajectory_socket.send_pyobj(frame) current_time = time.time() logger.debug(f"Trajectories: {len(active_tracks)}. Current frame delay = {current_time-frame.time}s (trajectories: {current_time - start_time}s)") # self.trajectory_socket.send_string(json.dumps(trajectories)) # provide a {ID: {id: ID, history: [[x,y],[x,y],...]}} # TODO: provide a track object that actually keeps history (unlike tracker) #TODO calculate fps (also for other loops to see asynchonity) # fpsfilter=fpsfilter*.9+(1/dt)*.1 #trust value in order to stabilize fps display if training_csv: training_csv.writerows([{ 'frame_id': round(frame.index * 10., 1), # not really time 'track_id': t.track_id, 'l': t.history[-1].l, 't': t.history[-1].t, 'w': t.history[-1].w, 'h': t.history[-1].h, 'x': t.get_projected_history(frame.H)[-1][0], 'y': t.get_projected_history(frame.H)[-1][1], 'state': t.history[-1].state.value # only keep _actual_detections, no lost entries } for t in active_tracks.values() # if t.history[-1].state != DetectionState.Lost ]) training_frames += len(active_tracks) # print(time.time() - start_time) if training_fp: training_fp.close() lines = { 'train': int(training_frames * .8), 'val': int(training_frames * .12), 'test': int(training_frames * .08), } logger.info(f"Splitting gathered data from {training_fp.name}") with open(training_fp.name, 'r') as source_fp: for name, line_nrs in lines.items(): dir_path = self.config.save_for_training / name dir_path.mkdir(exist_ok=True) file = dir_path / 'tracked.txt' logger.debug(f"- Write {line_nrs} lines to {file}") with file.open('w') as target_fp: for i in range(line_nrs): target_fp.write(source_fp.readline()) logger.info('Stopping') def _yolov8_track(self, frame: Frame,) -> [Detection]: results: [YOLOResult] = self.model.track(frame.img, persist=True, tracker="bytetrack.yaml", verbose=False) if results[0].boxes is None or results[0].boxes.id is None: # work around https://github.com/ultralytics/ultralytics/issues/5968 return [] return [Detection(track_id, bbox[0]-.5*bbox[2], bbox[1]-.5*bbox[3], bbox[2], bbox[3], 1, DetectionState.Confirmed, frame.index) for bbox, track_id in zip(results[0].boxes.xywh.cpu(), results[0].boxes.id.int().cpu().tolist())] def _resnet_track(self, img, scale: float = 1) -> [Detection]: if scale != 1: dsize = (int(img.shape[1] * scale), int(img.shape[0] * scale)) img = cv2.resize(img, dsize) detections = self._resnet_detect_persons(img) tracks: [DeepsortTrack] = self.mot_tracker.update_tracks(detections, frame=img) return [Detection.from_deepsort(t).get_scaled(1/scale) for t in tracks] def _resnet_detect_persons(self, frame) -> [Detection]: t = torch.from_numpy(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) # change axes of image loaded image to be compatilbe with torch.io.read_image (which has C,W,H format instead of W,H,C) t = t.permute(2, 0, 1) batch = self.preprocess(t)[None, :].to(self.device) # no_grad can be used on inference, should be slightly faster with torch.no_grad(): predictions = self.model(batch) prediction = predictions[0] # we feed only one frame at once # TODO: check if we need e.g. cyclist mask = prediction['labels'] == 1 # if we want more than one label: np.isin(prediction['labels'], [1,86]) scores = prediction['scores'][mask] labels = prediction['labels'][mask] boxes = prediction['boxes'][mask] # TODO: introduce confidence and NMS supression: https://github.com/cfotache/pytorch_objectdetecttrack/blob/master/PyTorch_Object_Tracking.ipynb # (which I _think_ we better do after filtering) # alternatively look at Soft-NMS https://towardsdatascience.com/non-maximum-suppression-nms-93ce178e177c # dets - a numpy array of detections in the format [[x1,y1,x2,y2,score, label],[x1,y1,x2,y2,score, label],...] detections = np.array([np.append(bbox, [score, label]) for bbox, score, label in zip(boxes.cpu(), scores.cpu(), labels.cpu())]) detections = self.detect_persons_deepsort_wrapper(detections) return detections @classmethod def detect_persons_deepsort_wrapper(cls, detections): """make detect_persons() compatible with deep_sort_realtime tracker by going from ltrb to ltwh and different nesting """ return [([d[0], d[1], d[2]-d[0], d[3]-d[1]], d[4], d[5]) for d in detections] def run_tracker(config: Namespace, is_running: Event): router = Tracker(config, is_running) router.track() class Smoother: def __init__(self, window_len=2): self.smoother = ConvolutionSmoother(window_len=window_len, window_type='ones', copy=None) def smooth_frame_tracks(self, frame: Frame) -> Frame: new_tracks = [] for track in frame.tracks.values(): ls = [d.l for d in track.history] ts = [d.t for d in track.history] ws = [d.w for d in track.history] hs = [d.h for d in track.history] self.smoother.smooth(ls) ls = self.smoother.smooth_data[0] self.smoother.smooth(ts) ts = self.smoother.smooth_data[0] self.smoother.smooth(ws) ws = self.smoother.smooth_data[0] self.smoother.smooth(hs) hs = self.smoother.smooth_data[0] new_history = [Detection(d.track_id, l, t, w, h, d.conf, d.state, d.frame_nr) for l, t, w, h, d in zip(ls,ts,ws,hs, track.history)] new_track = Track(track.track_id, new_history, track.predictor_history, track.predictions) new_tracks.append(new_track) frame.tracks = {t.track_id: t for t in new_tracks} return frame def smooth_frame_predictions(self, frame) -> Frame: for track in frame.tracks.values(): new_predictions = [] if not track.predictions: continue for prediction in track.predictions: xs = [d[0] for d in prediction] ys = [d[1] for d in prediction] self.smoother.smooth(xs) xs = self.smoother.smooth_data[0] self.smoother.smooth(ys) ys = self.smoother.smooth_data[0] smooth_prediction = [[x,y] for x, y in zip(xs, ys)] new_predictions.append(smooth_prediction) track.predictions = new_predictions return frame