trap/trap/tracker.py
2025-04-03 21:03:04 +02:00

804 lines
No EOL
34 KiB
Python

from argparse import Namespace
from collections import defaultdict
import csv
from dataclasses import dataclass, field
import json
import logging
from math import nan
from multiprocessing import Event
import multiprocessing
from pathlib import Path
import pickle
import time
from typing import DefaultDict, Dict, Optional, List
import jsonlines
import numpy as np
import torch
import torchvision
import ultralytics
import zmq
import cv2
from torchvision.models.detection import retinanet_resnet50_fpn_v2, RetinaNet_ResNet50_FPN_V2_Weights, keypointrcnn_resnet50_fpn, KeypointRCNN_ResNet50_FPN_Weights, maskrcnn_resnet50_fpn_v2, MaskRCNN_ResNet50_FPN_V2_Weights, FasterRCNN_ResNet50_FPN_V2_Weights, fasterrcnn_resnet50_fpn_v2
from deep_sort_realtime.deepsort_tracker import DeepSort
from torchvision.models import ResNet50_Weights
from deep_sort_realtime.deep_sort.track import Track as DeepsortTrack
from ultralytics import YOLO
from ultralytics.engine.results import Results as YOLOResult
from trap import timer
from trap.frame_emitter import Camera, DataclassJSONEncoder, DetectionState, Frame, Detection, Track
from bytetracker import BYTETracker
from tsmoothie.smoother import KalmanSmoother, ConvolutionSmoother
import tsmoothie.smoother
from datetime import datetime, timedelta
# Detection = [int, int, int, int, float, int]
# Detections = [Detection]
# This is the dt that is also used by the scene.
# as this needs to be rather stable, try to adhere
# to it by waiting when we are faster. Value chosen based
# on a rough estimate of tracker duration
TARGET_DT = .1
logger = logging.getLogger("trap.tracker")
DETECTOR_RETINANET = 'retinanet'
DETECTOR_MASKRCNN = 'maskrcnn'
DETECTOR_FASTERRCNN = 'fasterrcnn'
DETECTOR_YOLOv8 = 'ultralytics'
TRACKER_DEEPSORT = 'deepsort'
TRACKER_BYTETRACK = 'bytetrack'
DETECTORS = [DETECTOR_RETINANET, DETECTOR_MASKRCNN, DETECTOR_FASTERRCNN, DETECTOR_YOLOv8]
TRACKERS =[TRACKER_DEEPSORT, TRACKER_BYTETRACK]
TRACKER_CONFIDENCE_MINIMUM = .2
TRACKER_BYTETRACK_MINIMUM = .1 # bytetrack can track items iwth lower thershold
NON_MAXIMUM_SUPRESSION = 1
RCNN_SCALE = .4 # seems to have no impact on detections in the corners
def _yolov8_track(frame: Frame, model: YOLO, **kwargs) -> List[Detection]:
results: List[YOLOResult] = list(model.track(frame.img, persist=True, tracker="custom_bytetrack.yaml", verbose=False, conf=0.00001, **kwargs))
if results[0].boxes is None or results[0].boxes.id is None:
# work around https://github.com/ultralytics/ultralytics/issues/5968
return []
boxes = results[0].boxes.xywh.cpu()
track_ids = results[0].boxes.id.int().cpu().tolist()
classes = results[0].boxes.cls.int().cpu().tolist()
return [Detection(track_id, bbox[0]-.5*bbox[2], bbox[1]-.5*bbox[3], bbox[2], bbox[3], 1, DetectionState.Confirmed, frame.index, class_id) for bbox, track_id, class_id in zip(boxes, track_ids, classes)]
class Multifile():
def __init__(self, srcs: List[Path]):
self.srcs = srcs
self.g = self.__iter__()
self.current_file = None
@property
def name(self):
return ", ".join([s.name for s in self.srcs])
def __iter__(self):
for path in self.srcs:
self.current_file = path
with path.open('r') as fp:
for l in fp:
yield l
def readline(self):
return self.g.__next__()
FIELDNAMES = ['frame_id', 'track_id', 'l', 't', 'w', 'h', 'x', 'y', 'state', 'source']
class TrackFilter:
pass
def apply(self, tracks: List[Track], camera: Camera):
return [t for t in tracks if self.filter(t, camera)]
def apply_to_dict(self, tracks: Dict[str, Track], camera: Camera):
tracks = self.apply(tracks.values(), camera)
return {t.track_id: t for t in tracks}
class FinalDisplacementFilter(TrackFilter):
def __init__(self, min_displacement):
self.min_displacement = min_displacement
def filter(self, track: Track, camera: Camera):
history = track.get_projected_history(H=None, camera=camera)
displacement = np.linalg.norm(history[0]-history[-1])
return displacement > self.min_displacement
class TrackReader:
def __init__(self, path: Path, fps: int, include_blacklisted = False, exclude_whitelisted = False):
self.blacklist_file = path / "blacklist.jsonl"
self.whitelist_file = path / "whitelist.jsonl" # for skipping
self.tracks_file = path / "tracks.pkl"
# with self.tracks_file.open('r') as fp:
# tracks_dict: dict = json.load(fp)
with self.tracks_file.open('rb') as fp:
tracks: dict = pickle.load(fp)
if self.blacklist_file.exists():
with jsonlines.open(self.blacklist_file, 'r') as reader:
blacklist = [track_id for track_id in reader.iter(type=str)]
else:
blacklist = []
if self.whitelist_file.exists():
with jsonlines.open(self.whitelist_file, 'r') as reader:
whitelist = [track_id for track_id in reader.iter(type=str)]
else:
whitelist = []
self._tracks = { track_id: detection_values
for track_id, detection_values in tracks.items()
if (include_blacklisted or track_id not in blacklist) and
(not exclude_whitelisted or track_id not in whitelist)
}
self.fps = fps
def __len__(self):
return len(self._tracks)
def get(self, track_id):
return self._tracks[track_id]
# detection_values = self._tracks[track_id]
# history = []
# # for detection_values in
# source = None
# for detection_items in detection_values:
# d = dict(zip(FIELDNAMES, detection_items))
# history.append(Detection(
# d['track_id'],
# d['l'],
# d['t'],
# d['w'],
# d['h'],
# nan,
# d['state'],
# d['frame_id'],
# 1,
# ))
# source = int(d['source'])
# return Track(track_id, history, fps=self.fps, source=source)
def __iter__(self):
for track_id in self._tracks:
yield self.get(track_id)
def track_ids(self):
return list(self._tracks.keys())
def read_tracks_json(path: Path, fps):
"""
Reader for tracks.json produced by TrainingDataWriter
"""
reader = TrackReader(path, fps)
for t in reader:
yield t
class TrainingDataWriter:
def __init__(self, training_path: Optional[Path]):
if training_path is None:
self.path = None
return
if not isinstance(training_path, Path):
raise ValueError("save-for-training should be a path")
if not training_path.exists():
logger.info(f"Making path for training data: {training_path}")
training_path.mkdir(parents=True, exist_ok=False)
else:
logger.warning(f"Path for training-data exists: {training_path}. Continuing assuming that's ok.")
# following https://github.com/StanfordASL/Trajectron-plus-plus/blob/master/experiments/pedestrians/process_data.py
self.path = training_path
def __enter__(self):
if self.path:
d = datetime.now().isoformat(timespec="minutes")
self.training_fp = open(self.path / f'all-{d}.txt', 'w')
logger.debug(f"Writing tracker data to {self.training_fp.name}")
# following https://github.com/StanfordASL/Trajectron-plus-plus/blob/master/experiments/pedestrians/process_data.py
self.csv = csv.DictWriter(self.training_fp, fieldnames=FIELDNAMES, delimiter='\t', quoting=csv.QUOTE_NONE)
self.count = 0
return self
def add(self, frame: Frame, tracks: List[Track]):
if not self.path:
# skip if disabled
return
self.csv.writerows([{
'frame_id': round(frame.index * 10., 1), # not really time
'track_id': t.track_id,
'l': float(t.history[-1].l), # to float, so we're sure it's not a torch.tensor()
't': float(t.history[-1].t),
'w': float(t.history[-1].w),
'h': float(t.history[-1].h),
'x': t.get_projected_history(frame.H, frame.camera)[-1][0],
'y': t.get_projected_history(frame.H, frame.camera)[-1][1],
'state': t.history[-1].state.value
# only keep _actual_detections, no lost entries
} for t in tracks
# if t.history[-1].state != DetectionState.Lost
])
self.count += len(tracks)
def __exit__(self, exc_type, exc_value, exc_tb):
# ... ignore exception (type, value, traceback)
if not self.path:
return
self.training_fp.close()
rewrite_raw_track_files(self.path)
def rewrite_raw_track_files(path: Path):
source_files = list(sorted(path.glob("*.txt"))) # we loop twice, so need a list instead of generator
total = 0
sources = Multifile(source_files)
for line in sources:
if len(line) > 3: # make sure not to count empty lines
total += 1
destinations = {
'train': int(total * .8),
'val': int(total * .12),
'test': int(total * .08),
}
logger.info(f"Splitting gathered data from {source_files}")
# for source_file in source_files:
tracks_file = path / 'tracks.json'
tracks_pkl = path / 'tracks.pkl'
tracks = defaultdict(lambda: Track())
offset = 0
max_track_id = 0
prev_file = None
# all-2024-11-12T13:30.txt
file_date = None
src_file_nr = 0
for name, line_nrs in destinations.items():
dir_path = path / name
dir_path.mkdir(exist_ok=True)
file = dir_path / 'tracked.txt'
logger.debug(f"- Write {line_nrs} lines to {file}")
with file.open('w') as target_fp:
for i in range(line_nrs):
line = sources.readline()
current_file = sources.current_file
if prev_file != current_file:
offset: int = max_track_id
logger.info(f'{name} - update offset {offset} ({sources.current_file})')
prev_file = current_file
src_file_nr += 1
try:
file_date = datetime.strptime(current_file.name, 'all-%Y-%m-%dT%H:%M.txt')
except ValueError as e:
logger.error(str(e))
file_date = None
parts = line.split('\t')
track_id = int(parts[1]) + offset
if file_date:
frame_date = file_date + timedelta(seconds = int(float(parts[0]))//10)
else:
frame_date = None
if track_id > max_track_id:
max_track_id = track_id
track_id = str(track_id)
target_fp.write("\t".join(parts))
parts = [float(p) for p in parts]
# ['frame_id', 'track_id', 'l', 't', 'w', 'h', 'x', 'y', 'state', 'source']
point = Detection(track_id, parts[2], parts[3], parts[4], parts[5], 1, DetectionState(int(parts[8])), int(parts[0]/10), 1)
# history = [
# for d in parts]
tracks[track_id].track_id = track_id
tracks[track_id].source = src_file_nr
tracks[track_id].history.append(point)
# tracks[track_id].append([
# int(parts[0] / 10),
# track_id,
# ] + parts[2:8] + [int(parts[8]), src_file_nr])
with tracks_file.open('w') as fp:
logger.info(f"Write {len(tracks)} tracks to {str(tracks_file)}")
json.dump(tracks, fp, cls=DataclassJSONEncoder, indent=2)
with tracks_pkl.open('wb') as fp:
logger.info(f"Write {len(tracks)} tracks to {str(tracks_pkl)}")
pickle.dump(dict(tracks), fp)
class TrackerWrapper():
def __init__(self, tracker):
self.tracker = tracker
def track_detections():
raise RuntimeError("Not implemented")
@classmethod
def init_type(cls, tracker_type: str):
if tracker_type == TRACKER_BYTETRACK:
return ByteTrackWrapper(BYTETracker(track_thresh=TRACKER_BYTETRACK_MINIMUM, match_thresh=TRACKER_CONFIDENCE_MINIMUM, frame_rate=12)) # TODO)) Framerate from emitter
else:
return DeepSortWrapper(DeepSort(n_init=5, max_age=30, nms_max_overlap=NON_MAXIMUM_SUPRESSION,
embedder='torchreid', embedder_wts="../MODELS/osnet_x1_0_imagenet.pth"
))
class DeepSortWrapper(TrackerWrapper):
def track_detections(self, detections, img: cv2.Mat, frame_idx: int):
detections = Tracker.detect_persons_deepsort_wrapper(detections)
tracks: List[DeepsortTrack] = self.tracker.update_tracks(detections, frame=img)
active_tracks = [t for t in tracks if t.is_confirmed()]
return [Detection.from_deepsort(t, frame_idx) for t in active_tracks]
# raw_detections, embeds=None, frame=None, today=None, others=None, instance_masks=Non
class ByteTrackWrapper(TrackerWrapper):
def __init__(self, tracker: BYTETracker):
self.tracker = tracker
def track_detections(self, detections: tuple[list[float,float,float,float], float, float], img: cv2.Mat, frame_idx: int):
# detections
if detections.shape[0] == 0:
detections = np.ndarray((0,0)) # needs to be 2-D
_ = self.tracker.update(detections)
removed_tracks = self.tracker.removed_stracks
active_tracks = [track for track in self.tracker.tracked_stracks if track.is_activated]
# TODO)) why was this in here:
# active_tracks = [track for track in active_tracks if track.start_frame < (self.tracker.frame_id - 5)]
return [Detection.from_bytetrack(track, frame_idx) for track in active_tracks]
class Tracker:
def __init__(self, config: Namespace):
self.config = config
# # TODO: config device
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# TODO: support removal
self.tracks: DefaultDict[str, Track] = defaultdict(lambda: Track())
logger.debug(f"Load tracker: {self.config.detector}")
conf = TRACKER_BYTETRACK_MINIMUM if self.config.tracker == TRACKER_BYTETRACK else TRACKER_CONFIDENCE_MINIMUM
if self.config.detector == DETECTOR_RETINANET:
# weights = RetinaNet_ResNet50_FPN_V2_Weights.DEFAULT
# self.model = retinanet_resnet50_fpn_v2(weights=weights, score_thresh=0.2)
weights = KeypointRCNN_ResNet50_FPN_Weights.COCO_V1
self.model = keypointrcnn_resnet50_fpn(weights=weights, box_score_thresh=conf)
self.model.to(self.device)
# Put the model in inference mode
self.model.eval()
# Get the transforms for the model's weights
self.preprocess = weights.transforms().to(self.device)
self.mot_tracker = TrackerWrapper.init_type(self.config.tracker)
elif self.config.detector == DETECTOR_FASTERRCNN:
# weights = RetinaNet_ResNet50_FPN_V2_Weights.DEFAULT
# self.model = retinanet_resnet50_fpn_v2(weights=weights, score_thresh=0.2)
weights = FasterRCNN_ResNet50_FPN_V2_Weights.COCO_V1
self.model = fasterrcnn_resnet50_fpn_v2(weights=weights, box_score_thresh=conf)
self.model.to(self.device)
# Put the model in inference mode
self.model.eval()
# Get the transforms for the model's weights
self.preprocess = weights.transforms().to(self.device)
self.mot_tracker = TrackerWrapper.init_type(self.config.tracker)
elif self.config.detector == DETECTOR_MASKRCNN:
weights = MaskRCNN_ResNet50_FPN_V2_Weights.COCO_V1
self.model = maskrcnn_resnet50_fpn_v2(weights=weights, box_score_thresh=conf) # if we use ByteTrack we can work with low probablity!
self.model.to(self.device)
# Put the model in inference mode
self.model.eval()
# Get the transforms for the model's weights
self.preprocess = weights.transforms().to(self.device)
# self.mot_tracker = DeepSort(n_init=5, max_iou_distance=1, max_cosine_distance=0.2, max_age=15, nms_max_overlap=0.9,
self.mot_tracker = TrackerWrapper.init_type(self.config.tracker)
elif self.config.detector == DETECTOR_YOLOv8:
# self.model = YOLO('EXPERIMENTS/yolov8x.pt')
self.model = YOLO('yolo11x.pt')
else:
raise RuntimeError(f"{self.config.detector} is not implemented yet. See --help")
# homography = list(source.glob('*img2world.txt'))[0]
# self.H = self.config.H
if self.config.smooth_tracks:
logger.info("Smoother enabled")
fps = 12 # TODO)) make configurable, or get from cam
self.smoother = Smoother(window_len=fps*5, convolution=False)
else:
logger.info("Smoother Disabled (enable with --smooth-tracks)")
logger.debug("Set up tracker")
def track_frame(self, frame: Frame):
if self.config.detector == DETECTOR_YOLOv8:
detections: List[Detection] = _yolov8_track(frame, self.model, classes=[0, 15, 16], imgsz=[1152, 640])
else :
detections: List[Detection] = self._resnet_track(frame, scale = RCNN_SCALE)
for detection in detections:
track = self.tracks[detection.track_id]
track.track_id = detection.track_id # for new tracks
track.fps = frame.camera.fps
# track.fps = self.config.camera.fps # for new tracks
track.history.append(detection) # add to history
return detections
def track(self, is_running: Event, timer_counter: int = 0):
"""
Live tracking of frames coming in over zmq
"""
self.is_running = is_running
context = zmq.Context()
self.frame_sock = context.socket(zmq.SUB)
self.frame_sock.setsockopt(zmq.CONFLATE, 1) # only keep latest frame. NB. make sure this comes BEFORE connect, otherwise it's ignored!!
self.frame_sock.setsockopt(zmq.SUBSCRIBE, b'')
self.frame_sock.connect(self.config.zmq_frame_addr)
self.trajectory_socket = context.socket(zmq.PUB)
self.trajectory_socket.setsockopt(zmq.CONFLATE, 1) # only keep latest frame
self.trajectory_socket.bind(self.config.zmq_trajectory_addr)
prev_run_time = 0
# training_fp = None
# training_csv = None
# training_frames = 0
# if self.config.save_for_training is not None:
# if not isinstance(self.config.save_for_training, Path):
# raise ValueError("save-for-training should be a path")
# if not self.config.save_for_training.exists():
# logger.info(f"Making path for training data: {self.config.save_for_training}")
# self.config.save_for_training.mkdir(parents=True, exist_ok=False)
# else:
# logger.warning(f"Path for training-data exists: {self.config.save_for_training}. Continuing assuming that's ok.")
# training_fp = open(self.config.save_for_training / 'all.txt', 'w')
# # following https://github.com/StanfordASL/Trajectron-plus-plus/blob/master/experiments/pedestrians/process_data.py
# training_csv = csv.DictWriter(training_fp, fieldnames=['frame_id', 'track_id', 'l', 't', 'w', 'h', 'x', 'y', 'state'], delimiter='\t', quoting=csv.QUOTE_NONE)
prev_frame_i = -1
with TrainingDataWriter(self.config.save_for_training) as writer:
end_time = None
tracker_dt = None
w_time = None
displacement_filter = FinalDisplacementFilter(.2)
while self.is_running.is_set():
with timer_counter.get_lock():
timer_counter.value += 1
# this waiting for target_dt causes frame loss. E.g. with target_dt at .1, it
# skips exactly 1 frame on a 10 fps video (which, it obviously should not do)
# so for now, timing should move to emitter
# this_run_time = time.time()
# # logger.debug(f'test {prev_run_time - this_run_time}')
# time.sleep(max(0, prev_run_time - this_run_time + TARGET_DT))
# prev_run_time = time.time()
poll_time = time.time()
zmq_ev = self.frame_sock.poll(timeout=2000)
if not zmq_ev:
logger.warning('skip poll after 2000ms')
# when there's no data after timeout, loop so that is_running is checked
continue
start_time = time.time()
frame: Frame = self.frame_sock.recv_pyobj() # frame delivery in current setup: 0.012-0.03s
if frame.index > (prev_frame_i+1):
logger.warning(f"Dropped {frame.index - prev_frame_i - 1} frames ({frame.index=}, {prev_frame_i=}) -- poll time {start_time-poll_time:.5f}")
if tracker_dt:
logger.warning(f"last loop took {tracker_dt} (finished {start_time - end_time:0.5f} ago, writing took {w_time-end_time} and finshed {start_time - w_time} ago).. {writer.path}")
prev_frame_i = frame.index
# load homography into frame (TODO: should this be done in emitter?)
if frame.H is None:
raise RuntimeError('Tracker no longer configures H')
# fallback: load configured H
# frame.H = self.H
# logger.info(f"Frame delivery delay = {time.time()-frame.time}s")
detections: List[Detection] = self.track_frame(frame)
# Store detections into tracklets
projected_coordinates = []
# now in track_frame()
# for detection in detections:
# track = self.tracks[detection.track_id]
# track.track_id = detection.track_id # for new tracks
# track.history.append(detection) # add to history
# projected_coordinates.append(track.get_projected_history(self.H)) # then get full history
# TODO: hadle occlusions, and dissappearance
# if len(track.history) > 30: # retain 90 tracks for 90 frames
# track.history.pop(0)
# trajectories = {}
# for detection in detections:
# tid = str(detection.track_id)
# track = self.tracks[detection.track_id]
# coords = track.get_projected_history(self.H) # get full history
# trajectories[tid] = {
# "id": tid,
# "det_conf": detection.conf,
# "bbox": detection.to_ltwh(),
# "history": [{"x":c[0], "y":c[1]} for c in coords[0]] if not self.config.bypass_prediction else coords[0].tolist() # already doubles nested, fine for test
# }
active_track_ids = [d.track_id for d in detections]
active_tracks = {t.track_id: t.get_with_interpolated_history() for t in self.tracks.values() if t.track_id in active_track_ids}
active_tracks = displacement_filter.apply_to_dict(active_tracks, frame.camera)# a filter to remove just detecting static objects
removable_tracks = []
for track_id, track in self.tracks.items():
if not len(track.history):
continue
detection: Detection = track.history[-1]
if detection.frame_nr < (frame.index - frame.camera.fps * 5):
removable_tracks.append(track_id)
for track_id in removable_tracks:
del self.tracks[track_id]
# active_tracks = {t.track_id: t for t in self.tracks.values() if t.track_id in active_track_ids}
# active_tracks = {t.track_id: t for t in self.tracks.values() if t.track_id in active_track_ids}
# logger.info(f"{trajectories}")
frame.tracks = active_tracks
# if self.config.bypass_prediction:
# self.trajectory_socket.send_string(json.dumps(trajectories))
# else:
# self.trajectory_socket.send(pickle.dumps(frame))
if self.config.smooth_tracks:
frame = self.smoother.smooth_frame_tracks(frame)
# print(f"send to {self.trajectory_socket}, {self.config.zmq_trajectory_addr}")
self.trajectory_socket.send_pyobj(frame.without_img()) # ditch image for faster passthrough
end_time = time.time()
tracker_dt = end_time - start_time
# having {end_time-frame.time} creates incidental delay... don't know why, maybe because of send?. So add n/a for now
# or is it {len(active_tracks)} or {tracker_dt}
# logger.debug(f"Trajectories: n/a. Current frame delay = n/a s (trajectories:s)")
# self.trajectory_socket.send_string(json.dumps(trajectories))
# provide a {ID: {id: ID, history: [[x,y],[x,y],...]}}
# TODO: provide a track object that actually keeps history (unlike tracker)
# TODO calculate fps (also for other loops to see asynchonity)
# fpsfilter=fpsfilter*.9+(1/dt)*.1 #trust value in order to stabilize fps display
writer.add(frame, active_tracks.values())
w_time = time.time()
logger.info('Stopping')
def _resnet_track(self, frame: Frame, scale: float = 1) -> List[Detection]:
img = frame.img
if scale != 1:
dsize = (int(img.shape[1] * scale), int(img.shape[0] * scale))
img = cv2.resize(img, dsize)
detections = self._resnet_detect_persons(img)
tracks: List[Detection] = self.mot_tracker.track_detections(detections, img, frame.index)
# active_tracks = [t for t in tracks if t.is_confirmed()]
return [d.get_scaled(1/scale) for d in tracks]
def _resnet_detect_persons(self, frame) -> List[Detection]:
t = torch.from_numpy(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
# change axes of image loaded image to be compatilbe with torch.io.read_image (which has C,W,H format instead of W,H,C)
t = t.permute(2, 0, 1)
batch = self.preprocess(t)[None, :].to(self.device)
# no_grad can be used on inference, should be slightly faster
with torch.no_grad():
predictions = self.model(batch)
prediction = predictions[0] # we feed only one frame at once
# TODO: check if we need e.g. cyclist
mask = prediction['labels'] == 1 # if we want more than one label: np.isin(prediction['labels'], [1,86])
scores = prediction['scores'][mask]
# print(scores, prediction['labels'])
labels = prediction['labels'][mask]
boxes = prediction['boxes'][mask]
# print(prediction['scores'])
if NON_MAXIMUM_SUPRESSION < 1:
nms_mask = torch.zeros(scores.shape[0]).bool()
nms_keep_ids = torchvision.ops.nms(boxes, scores, NON_MAXIMUM_SUPRESSION)
nms_mask[nms_keep_ids] = True
print(scores.shape[0], nms_keep_ids, nms_mask)
scores = scores[nms_mask]
labels = labels[nms_mask]
boxes = boxes[nms_mask]
# TODO: introduce confidence and NMS supression: https://github.com/cfotache/pytorch_objectdetecttrack/blob/master/PyTorch_Object_Tracking.ipynb
# (which I _think_ we better do after filtering)
# alternatively look at Soft-NMS https://towardsdatascience.com/non-maximum-suppression-nms-93ce178e177c
# dets - a numpy array of detections in the format [[x1,y1,x2,y2,score, label],[x1,y1,x2,y2,score, label],...]
detections = np.array([np.append(bbox, [score, label]) for bbox, score, label in zip(boxes.cpu(), scores.cpu(), labels.cpu())])
return detections
@classmethod
def detect_persons_deepsort_wrapper(cls, detections):
"""make detect_persons() compatible with
deep_sort_realtime tracker by going from ltrb to ltwh and
different nesting
"""
return [([d[0], d[1], d[2]-d[0], d[3]-d[1]], d[4], d[5]) for d in detections]
def run_tracker(config: Namespace, is_running: Event, timer_counter):
router = Tracker(config)
router.track(is_running, timer_counter)
def run():
# Frame emitter
import argparse
argparser = argparse.ArgumentParser()
argparser.add_argument('--zmq-frame-addr',
help='Manually specity communication addr for the frame messages',
type=str,
default="ipc:///tmp/feeds_frame")
argparser.add_argument('--zmq-trajectory-addr',
help='Manually specity communication addr for the trajectory messages',
type=str,
default="ipc:///tmp/feeds_traj")
argparser.add_argument("--save-for-training",
help="Specify the path in which to save",
type=Path,
default=None)
argparser.add_argument("--detector",
help="Specify the detector to use",
type=str,
default=DETECTOR_YOLOv8,
choices=DETECTORS)
argparser.add_argument("--tracker",
help="Specify the detector to use",
type=str,
default=TRACKER_BYTETRACK,
choices=TRACKERS)
argparser.add_argument("--smooth-tracks",
help="Smooth the tracker tracks before sending them to the predictor",
action='store_true')
config = argparser.parse_args()
is_running = multiprocessing.Event()
is_running.set()
timer_counter = timer.Timer('frame_emitter')
router = Tracker(config)
router.track(is_running, timer_counter.iterations)
is_running.clear()
class Smoother:
def __init__(self, window_len=6, convolution=False):
# for some reason this smoother messes the predictions. Probably skews the points too much??
if convolution:
self.smoother = ConvolutionSmoother(window_len=window_len, window_type='hanning', copy=None)
else:
# "Unlike Kalman filtering, which focuses on predicting and updating the current state using historical measurements, Kalman smoothing enhances the accuracy of past state values"
# see https://medium.com/@shahalkp1/kalman-smoothing-using-tsmoothie-0175260464e5
self.smoother = KalmanSmoother(component='level_trend', component_noise={'level':0.02, 'season': .01, 'trend':0.02},n_seasons = 2, copy=None)
def smooth(self, points: List[float]):
self.smoother.smooth(points)
return self.smoother.smooth_data[0]
def smooth_track(self, track: Track) -> Track:
ls = [d.l for d in track.history]
ts = [d.t for d in track.history]
ws = [d.w for d in track.history]
hs = [d.h for d in track.history]
self.smoother.smooth(ls)
ls = self.smoother.smooth_data[0]
self.smoother.smooth(ts)
ts = self.smoother.smooth_data[0]
self.smoother.smooth(ws)
ws = self.smoother.smooth_data[0]
self.smoother.smooth(hs)
hs = self.smoother.smooth_data[0]
new_history = [Detection(d.track_id, l, t, w, h, d.conf, d.state, d.frame_nr, d.det_class) for l, t, w, h, d in zip(ls,ts,ws,hs, track.history)]
return Track(track.track_id, new_history, track.predictor_history, track.predictions, track.fps)
def smooth_frame_tracks(self, frame: Frame) -> Frame:
new_tracks = []
for track in frame.tracks.values():
new_track = self.smooth_track(track)
new_tracks.append(new_track)
frame.tracks = {t.track_id: t for t in new_tracks}
return frame
def smooth_frame_predictions(self, frame: Frame) -> Frame:
for track in frame.tracks.values():
new_predictions = []
if not track.predictions:
continue
for prediction in track.predictions:
xs = [d[0] for d in prediction]
ys = [d[1] for d in prediction]
self.smoother.smooth(xs)
xs = self.smoother.smooth_data[0]
self.smoother.smooth(ys)
ys = self.smoother.smooth_data[0]
smooth_prediction = [[x,y] for x, y in zip(xs, ys)]
new_predictions.append(smooth_prediction)
track.predictions = new_predictions
return frame