fisheye support

This commit is contained in:
Ruben van de Ven 2025-05-09 21:45:22 +02:00
parent ef24bb56f5
commit 7eaab23668
9 changed files with 428 additions and 143 deletions

View file

@ -54,7 +54,7 @@ torch = [{ url = "https://download.pytorch.org/whl/cu113/torch-1.12.1%2Bcu113-cp
torchvision = [{ url = "https://download.pytorch.org/whl/cu113/torchvision-0.13.1%2Bcu113-cp310-cp310-linux_x86_64.whl", marker = "python_version ~= '3.10' and sys_platform == 'linux'" }]
pandas-helper-calc = { git = "https://github.com/scls19fr/pandas-helper-calc" }
bytetracker = { git = "https://github.com/rubenvandeven/bytetrack-pip" }
baumer-neoapi = { path = "../../Downloads/Baumer_neoAPI_1.4.1_lin_x86_64_python/wheel/baumer_neoapi-1.4.1-cp34.cp35.cp36.cp37.cp38.cp39.cp310.cp311.cp312-none-linux_x86_64.whl" }
baumer-neoapi = { path = "../../Downloads/Baumer_neoAPI_1.5.0_lin_x86_64_python/wheel/baumer_neoapi-1.5.0-cp34.cp35.cp36.cp37.cp38.cp39.cp310.cp311.cp312-none-linux_x86_64.whl" }
foucault = { git = "https://git.rubenvandeven.com/r/conductofconduct" }
opencv-python = {path="./opencv_python-4.10.0.84-cp310-cp310-linux_x86_64.whl"}

File diff suppressed because one or more lines are too long

View file

@ -1,14 +1,14 @@
from __future__ import annotations
from abc import ABC, abstractmethod
import argparse
from ast import List
from enum import IntFlag
from itertools import cycle
import json
import logging
from pathlib import Path
import time
from typing import Iterable, Optional
from typing import Iterable, Optional, Tuple, Union, List
import cv2
from dataclasses import dataclass, field
import dataclasses
@ -19,11 +19,12 @@ from deep_sort_realtime.deep_sort.track import TrackState as DeepsortTrackState
from bytetracker.byte_tracker import STrack as ByteTrackTrack
from bytetracker.basetrack import TrackState as ByteTrackTrackState
import pandas as pd
from shapely import Point
from trap.utils import get_bins, inv_lerp, lerp
from trajectron.environment import Environment, Node, Scene
from urllib.parse import urlparse
from cv2.typing import MatLike
logger = logging.getLogger('trap.base')
class UrlOrPath():
@ -100,31 +101,134 @@ def H_from_path(path: Path):
H = np.loadtxt(path, delimiter=',')
return H
@dataclass
class Camera:
mtx: cv2.Mat
dist: cv2.Mat
w: float
h: float
H: cv2.Mat # homography
newcameramtx: cv2.Mat = field(init=False)
roi: cv2.typing.Rect = field(init=False)
fps: float
PointList = List[Tuple[float, float]] | np.ndarray | cv2.typing.MatLike
def scale_homography(H: cv2.Mat, scale: float):
"""Transform the given matrix so that it immediately converts
the points to img space"""
new_H = H.copy()
new_H[:2] = H[:2] * scale
return new_H
class DistortedCamera(ABC):
@abstractmethod
def undistort_img(self, img: MatLike):
return cv2.remap(img, self.map1, self.map2, interpolation=cv2.INTER_LINEAR, borderMode=cv2.BORDER_CONSTANT)
def project_img(self, undistorted_img: MatLike, scale: float = 1.0):
w, h = undistorted_img.shape[1], undistorted_img.shape[0]
if scale != 1:
H = scale_homography(self.H, scale)
else:
H = self.H
return cv2.warpPerspective(undistorted_img, H,(w, h))
def img_to_world(self, img: MatLike, scale = 1.):
img = self.undistort_img(img)
return self.project_img(img, scale)
@abstractmethod
def undistort_points(self, distorted_points: PointList):
pass
def project_point(self, point):
return self.project_points([point])[0]
def project_points(self, points: PointList, scale: float = 1.0):
if scale != 1:
H = scale_homography(self.H, scale)
else:
H = self.H
coords = cv2.perspectiveTransform(np.array([points]),H)
if coords.shape[1:] == (1,2):
coords = np.reshape(coords, (coords.shape[0], 2))
return coords
def points_img_to_world(self, points: PointList, scale = 1.):
# undistort & project
coords = self.undistort_points(points)
coords: np.ndarray[argparse.Any, np.dtype[np.integer[argparse.Any] | np.floating[argparse.Any]]] = self.project_points(coords, scale)
return coords
class FisheyeCamera(DistortedCamera):
def __init__(self, dim1, dim2, dim3, K, D, new_K, scaled_K, balance, H, fps):
# dimensions as per: https://medium.com/@kennethjiang/calibrate-fisheye-lens-using-opencv-part-2-13990f1b157f
self.dim1 = dim1 # original image
self.dim2 = dim2 # dimension of the box you want to keep after un-distorting the image. influced by balance
self.dim3 = dim3 # Dimension of the final box where OpenCV will put the undistorted image.
self.K = K
self.D = D
self.new_K = new_K
self.scaled_K = scaled_K
self.balance = balance
self.H = H # Homography
self._R = np.eye(3)
self.fps = fps
self.map1, self.map2 = cv2.fisheye.initUndistortRectifyMap(self.scaled_K, self.D, self._R, self.new_K, self.dim3, cv2.CV_16SC2)
def undistort_img(self, img: MatLike):
return cv2.remap(img, self.map1, self.map2, interpolation=cv2.INTER_LINEAR, borderMode=cv2.BORDER_CONSTANT)
def undistort_points(self, distorted_points: PointList):
points = cv2.fisheye.undistortPoints (np.array([distorted_points]).astype(np.float32), K=self.scaled_K, D=self.D, R=self._R, P=self.new_K)
return points[0]
@property
def projected_w(self):
return self.dim3[0]
@property
def projected_h(self):
return self.dim3[1]
@classmethod
def from_calibfile(cls, calibration_path, H, fps):
with calibration_path.open('r') as fp:
data = json.load(fp)
return cls(
data['dim1'],
data['dim2'],
data['dim3'],
np.array(data['K']),
np.array(data['D']),
np.array(data['new_K']),
np.array(data['scaled_K']),
data['balance'],
H, fps)
@classmethod
def from_paths(cls, calibration_path, h_path, fps):
H = H_from_path(h_path)
return cls.from_calibfile(calibration_path, H, fps)
class Camera(DistortedCamera):
def __init__(self, mtx: cv2.Mat, dist: cv2.Mat, w: float, h: float, H: cv2.Mat, fps: float):
self.mtx = mtx
self.dist = dist
self.w = w
self.h = h
self.H = H
self.fps = fps
def __post_init__(self):
self.newcameramtx, self.roi = cv2.getOptimalNewCameraMatrix(self.mtx, self.dist, (self.w,self.h), 1, (self.w,self.h))
@classmethod
def from_calibfile(cls, calibration_path, H, fps):
with calibration_path.open('r') as fp:
data = json.load(fp)
# print(data)
# print(data['camera_matrix'])
# camera = {
# 'camera_matrix': np.array(data['camera_matrix']),
# 'dist_coeff': np.array(data['dist_coeff']),
# }
return cls(
np.array(data['camera_matrix']),
np.array(data['dist_coeff']),
@ -132,18 +236,26 @@ class Camera:
data['dim']['height'],
H, fps)
@property
def projected_w(self):
return self.w
@property
def projected_h(self):
return self.h
@classmethod
def from_paths(cls, calibration_path, h_path, fps):
H = H_from_path(h_path)
return cls.from_calibfile(calibration_path, H, fps)
# def __init__(self, mtx, dist, w, h, H):
# self.mtx = mtx
# self.dist = dist
# self.w = w
# self.h = h
# self.newcameramtx, self.roi = cv2.getOptimalNewCameraMatrix(mtx, dist, (w,h), 1, (w,h))
# self.H = H # homography
def undistort_img(self, img: MatLike):
return cv2.undistort(img, self.mtx, self.dist, None, self.newcameramtx)
def undistort_points(self, distorted_points: PointList):
points = cv2.undistortPoints(np.array([distorted_points]).astype('float32'), self.mtx, self.dist, None, self.newcameramtx)
return points[0]
@dataclass
@ -212,20 +324,22 @@ class Track:
if not self.created_at:
self.created_at = time.time()
def get_projected_history(self, H: Optional[cv2.Mat] = None, camera: Optional[Camera]= None) -> np.array:
def get_projected_history(self, H: Optional[cv2.Mat] = None, camera: Optional[DistortedCamera]= None) -> np.array:
foot_coordinates = [d.get_foot_coords() for d in self.history]
# TODO)) Undistort points before perspective transform
if len(foot_coordinates):
if camera:
coords = cv2.undistortPoints(np.array([foot_coordinates]).astype('float32'), camera.mtx, camera.dist, None, camera.newcameramtx)
coords = cv2.perspectiveTransform(np.array(coords),camera.H)
return coords.reshape((coords.shape[0],2))
coords = camera.points_img_to_world(foot_coordinates)
return coords
# coords = cv2.undistortPoints(np.array([foot_coordinates]).astype('float32'), camera.mtx, camera.dist, None, camera.newcameramtx)
# coords = cv2.perspectiveTransform(np.array(coords),camera.H)
# return coords.reshape((coords.shape[0],2))
else:
coords = cv2.perspectiveTransform(np.array([foot_coordinates]),H)
return coords[0]
return np.array([])
def get_projected_history_as_dict(self, H, camera: Optional[Camera]= None) -> dict:
def get_projected_history_as_dict(self, H, camera: Optional[DistortedCamera]= None) -> dict:
coords = self.get_projected_history(H, camera)
return [{"x":c[0], "y":c[1]} for c in coords]
@ -551,10 +665,13 @@ class CameraAction(argparse.Action):
if values is None:
setattr(namespace, self.dest, None)
else:
camera = Camera.from_calibfile(Path(values), namespace.H, namespace.camera_fps)
# values = Path(values)
# with values.open('r') as fp:
# data = json.load(fp)
values = Path(values)
with values.open('r') as fp:
data = json.load(fp)
if 'type' in data and data['type'] == 'fisheye':
camera = FisheyeCamera.from_calibfile(Path(values), namespace.H, namespace.camera_fps)
else:
camera = Camera.from_calibfile(Path(values), namespace.H, namespace.camera_fps)
# # print(data)
# # print(data['camera_matrix'])
# # camera = {

View file

@ -67,7 +67,7 @@ class CvRenderer:
# TODO: get FPS from frame_emitter
# self.out = cv2.VideoWriter(str(filename), fourcc, 23.97, (1280,720))
self.fps = 60
self.frame_size = (self.config.camera.w,self.config.camera.h)
self.frame_size = (self.config.camera.projected_w,self.config.camera.projected_h)
self.hide_stats = False
self.out_writer = self.start_writer() if self.config.render_file else None
self.streaming_process = self.start_streaming() if self.config.render_url else None
@ -459,16 +459,16 @@ def get_animation_position(track: Track, current_frame: Frame):
# Deprecated
def decorate_frame(frame: Frame, tracker_frame: Frame, prediction_frame: Frame, first_time: float, config: Namespace, tracks: Dict[str, Track], predictions: Dict[str, Track], as_clusters = True, counter_listener: CounterListerner|None = None) -> np.array:
scale = 100
# TODO: replace opencv with QPainter to support alpha? https://doc.qt.io/qtforpython-5/PySide2/QtGui/QPainter.html#PySide2.QtGui.PySide2.QtGui.QPainter.drawImage
# or https://github.com/pygobject/pycairo?tab=readme-ov-file
# or https://pyglet.readthedocs.io/en/latest/programming_guide/shapes.html
# and use http://code.astraw.com/projects/motmot/pygarrayimage.html or https://gist.github.com/nkymut/1cb40ea6ae4de0cf9ded7332f1ca0d55
# or https://api.arcade.academy/en/stable/index.html (supports gradient color in line -- "Arcade is built on top of Pyglet and OpenGL.")
undistorted_img = cv2.undistort(frame.img, config.camera.mtx, config.camera.dist, None, config.camera.newcameramtx)
dst_img = cv2.warpPerspective(undistorted_img,convert_world_space_to_img_space(config.camera.H),(config.camera.w,config.camera.h))
dst_img = frame.camera.img_to_world(frame.img, scale)
# undistorted_img = cv2.undistort(frame.img, config.camera.mtx, config.camera.dist, None, config.camera.newcameramtx)
# dst_img = cv2.warpPerspective(undistorted_img,convert_world_space_to_img_space(config.camera.H),(config.camera.w,config.camera.h))
# dst_img2 = cv2.warpPerspective(undistorted_img,convert_world_space_to_img_space(config.camera.H), None)
# cv2.imwrite('/home/ruben/suspicion/DATASETS/hof3/camera2.png', dst_img2)
@ -489,12 +489,15 @@ def decorate_frame(frame: Frame, tracker_frame: Frame, prediction_frame: Frame,
# cv2.imwrite(str(self.config.output_dir / "orig.png"), warpedFrame)
cv2.rectangle(img, (0,0), (img.shape[1],25), (0,0,0), -1)
def conversion(points):
convert_world_points_to_img_points(points, scale)
if not tracker_frame:
cv2.putText(img, f"and track", (650,17), cv2.FONT_HERSHEY_PLAIN, 1, (255,255,0), 1)
else:
for track_id, track in tracks.items():
inv_H = np.linalg.pinv(tracker_frame.H)
draw_track_projected(img, track, int(track_id), config.camera, convert_world_points_to_img_points)
draw_track_projected(img, track, int(track_id), config.camera, conversion)
if not prediction_frame:
cv2.putText(img, f"Waiting for prediction...", (500,17), cv2.FONT_HERSHEY_PLAIN, 1, (255,255,0), 1)
@ -503,9 +506,9 @@ def decorate_frame(frame: Frame, tracker_frame: Frame, prediction_frame: Frame,
for track_id, track in predictions.items():
inv_H = np.linalg.pinv(prediction_frame.H)
# For debugging:
# draw_trackjectron_history(img, track, int(track.track_id), convert_world_points_to_img_points)
# draw_trackjectron_history(img, track, int(track.track_id), conversion)
anim_position = get_animation_position(track, frame)
draw_track_predictions(img, track, int(track.track_id)+1, config.camera, convert_world_points_to_img_points, anim_position=anim_position, as_clusters=as_clusters)
draw_track_predictions(img, track, int(track.track_id)+1, config.camera, conversion, anim_position=anim_position, as_clusters=as_clusters)
cv2.putText(img, f"{len(track.predictor_history) if track.predictor_history else 'none'}", to_point(track.history[0].get_foot_coords()), cv2.FONT_HERSHEY_COMPLEX, 1, (255,255,255), 1)
if prediction_frame.maps:
for i, m in enumerate(prediction_frame.maps):

View file

@ -34,7 +34,7 @@ class RenderablePoint():
def from_list(cls, l: List[float, float], color: SrgbaColor) -> RenderablePoint:
return cls([float(l[0]), float(l[1])], color)
SIMPLIFY_FACTOR_RDP = .05 # smaller is more detailed
SIMPLIFY_FACTOR_RDP = .001 # smaller is more detailed
SIMPLIFY_FACTOR_VW = 10
class SimplifyMethod(Enum):

View file

@ -309,7 +309,7 @@ class DrawnScenario(TrackScenario):
# if self.current_state is self.first_prediction or self.current_state is self.corrected_prediction:
# shape = np.array(shapes.YOUR if time.time() % 2 > 1 else shapes.FUTURE)
# text = "your" if time.time() % 2 > 1 else "future"
# color = SrgbaColor(0.5,0.5,0.,1.-self.lost_factor())
# color = SrgbaColor(0.5,0.5,0.5,1.-self.lost_factor())
# line = self.get_text_lines(text, shape, color)
# if not line:

View file

@ -79,19 +79,19 @@ def get_bins(bin_size: float):
return [[bin_size, 0], [bin_size, bin_size], [0, bin_size], [-bin_size, bin_size], [-bin_size, 0], [-bin_size, -bin_size], [0, -bin_size], [bin_size, -bin_size]]
def convert_world_space_to_img_space(H: cv2.Mat):
def convert_world_space_to_img_space(H: cv2.Mat, scale=100):
"""Transform the given matrix so that it immediately converts
the points to img space"""
new_H = H.copy()
new_H[:2] = H[:2] * 100
new_H[:2] = H[:2] * scale
return new_H
def convert_world_points_to_img_points(points: Iterable):
def convert_world_points_to_img_points(points: Iterable, scale=100):
"""Transform the given matrix so that it immediately converts
the points to img space"""
if isinstance(points, np.ndarray):
return np.array(points) * 100
return [[p[0]*100, p[1]*100] for p in points]
return np.array(points) * scale
return [[p[0]*scale, p[1]*scale] for p in points]
def display_top(snapshot: tracemalloc.Snapshot, key_type='lineno', limit=5):
snapshot = snapshot.filter_traces((

View file

@ -1,9 +1,12 @@
from dataclasses import dataclass
from itertools import cycle
import json
import logging
import math
from os import PathLike
from pathlib import Path
import time
from typing import Any, Generator, Iterable, List, Optional
from typing import Any, Generator, Iterable, List, Literal, Optional, Tuple
import neoapi
import cv2
import numpy as np
@ -21,23 +24,54 @@ class VideoSource:
def __iter__(self):
for i in self.recv():
yield i
BinningValue = Literal[1, 2]
Coordinate = Tuple[int, int]
@dataclass
class GigEConfig:
identifier: Optional[str] = None
binning_h: BinningValue = 1
binning_v: BinningValue = 1
pixel_format: int = neoapi.PixelFormat_BayerRG8
post_crop_tl: Optional[Coordinate] = None
post_crop_br: Optional[Coordinate] = None
@classmethod
def from_file(cls, file: PathLike):
with open(file, 'r') as fp:
return cls(**json.load(fp))
class GigE(VideoSource):
def __init__(self, identifier=None):
def __init__(self, config=GigEConfig):
self.config = config
self.camera = neoapi.Cam()
# self.camera.Connect('-B127')
self.camera.Connect(identifier)
self.camera.Connect(self.config.identifier)
# Default buffer mode, streaming, always returns latest frame
self.camera.SetImageBufferCount(10)
# neoAPI docs: Setting the neoapi.Cam.SetImageBufferCycleCount()to one ensures that all buffers but one are given back to the neoAPI to be re-cycled and never given to the user by the neoapi.Cam.GetImage() method.
self.camera.SetImageBufferCycleCount(1)
self.setPixelFormat(self.config.pixel_format)
if self.camera.IsConnected():
self.camera.f.PixelFormat.Set(neoapi.PixelFormat_RGB8)
self.camera.f.BinningHorizontal.Set(2)
self.camera.f.BinningVertical.Set(2)
self.pixfmt = self.camera.f.PixelFormat.Get()
# self.camera.f.PixelFormat.Set(neoapi.PixelFormat_RGB8)
self.camera.f.BinningHorizontal.Set(self.config.binning_h)
self.camera.f.BinningVertical.Set(self.config.binning_v)
print('exposure time', self.camera.f.ExposureAutoMaxValue.Set(20000))
print('exposure time', self.camera.f.ExposureTime.Get())
# print('exposure time max', self.camera.f.ExposureTimeGapMax.Get())
# print('exposure time min', self.camera.f.ExposureTimeGapMin.Get())
# self.pixfmt = self.camera.f.PixelFormat.Get()
def setPixelFormat(self, pixfmt):
self.pixfmt = pixfmt
self.camera.f.PixelFormat.Set(pixfmt)
# self.pixfmt = self.camera.f.PixelFormat.Get()
def recv(self):
while True:
@ -59,8 +93,16 @@ class GigE(VideoSource):
if img.dtype == np.uint16:
img = cv2.convertScaleAbs(img, alpha=(255.0/65535.0))
img = self._crop(img)
yield img
def _crop(self, img):
tl = self.config.post_crop_tl or (0,0)
br = self.config.post_crop_br or (img.shape[1], img.shape[0])
return img[tl[1]:br[1],tl[0]:br[0],:]
class SingleCvVideoSource(VideoSource):
def recv(self):
@ -178,7 +220,8 @@ def get_video_source(video_sources: List[UrlOrPath], camera: Camera, frame_offse
elif video_sources[0].url.scheme == 'gige':
if frame_offset:
logger.info("video-offset ignored for gige source")
return GigE(video_sources[0].url.hostname)
config = GigEConfig.from_file(Path(video_sources[0].url.netloc + video_sources[0].url.path))
return GigE(config)
else:
return FilelistSource(video_sources, offset = frame_offset, end=frame_end, loop=loop)
# os.environ["OPENCV_FFMPEG_CAPTURE_OPTIONS"] = "fflags;nobuffer|flags;low_delay|avioflags;direct|rtsp_transport;udp"

View file

@ -211,10 +211,10 @@ wheels = [
[[package]]
name = "baumer-neoapi"
version = "1.4.1"
source = { path = "../../Downloads/Baumer_neoAPI_1.4.1_lin_x86_64_python/wheel/baumer_neoapi-1.4.1-cp34.cp35.cp36.cp37.cp38.cp39.cp310.cp311.cp312-none-linux_x86_64.whl" }
version = "1.5.0"
source = { path = "../../Downloads/Baumer_neoAPI_1.5.0_lin_x86_64_python/wheel/baumer_neoapi-1.5.0-cp34.cp35.cp36.cp37.cp38.cp39.cp310.cp311.cp312-none-linux_x86_64.whl" }
wheels = [
{ filename = "baumer_neoapi-1.4.1-cp34.cp35.cp36.cp37.cp38.cp39.cp310.cp311.cp312-none-linux_x86_64.whl", hash = "sha256:22e378f98cc4112f942db88fca9de74ba34fdc91a05512312aa57adf8e2cf84e" },
{ filename = "baumer_neoapi-1.5.0-cp34.cp35.cp36.cp37.cp38.cp39.cp310.cp311.cp312-none-linux_x86_64.whl", hash = "sha256:ed16595f087694014ea16358b8d2ad7fe608db7608b22dc320db7dc01ae9deeb" },
]
[[package]]
@ -2508,7 +2508,7 @@ dependencies = [
[package.metadata]
requires-dist = [
{ name = "baumer-neoapi", path = "../../Downloads/Baumer_neoAPI_1.4.1_lin_x86_64_python/wheel/baumer_neoapi-1.4.1-cp34.cp35.cp36.cp37.cp38.cp39.cp310.cp311.cp312-none-linux_x86_64.whl" },
{ name = "baumer-neoapi", path = "../../Downloads/Baumer_neoAPI_1.5.0_lin_x86_64_python/wheel/baumer_neoapi-1.5.0-cp34.cp35.cp36.cp37.cp38.cp39.cp310.cp311.cp312-none-linux_x86_64.whl" },
{ name = "bytetracker", git = "https://github.com/rubenvandeven/bytetrack-pip" },
{ name = "deep-sort-realtime", specifier = ">=1.3.2,<2" },
{ name = "facenet-pytorch", specifier = ">=2.5.3" },