trap/trap/video_sources.py
2025-05-12 16:51:00 +02:00

234 lines
8.8 KiB
Python

from dataclasses import dataclass
from itertools import cycle
import json
import logging
import math
from os import PathLike
from pathlib import Path
import time
from typing import Any, Generator, Iterable, List, Literal, Optional, Tuple
import neoapi
import cv2
import numpy as np
from trap.base import Camera, UrlOrPath
logger = logging.getLogger('video_source')
class VideoSource:
"""Video Frame generator
"""
def recv(self) -> Generator[Optional[cv2.typing.MatLike], Any, None]:
raise RuntimeError("Not implemented")
def __iter__(self):
for i in self.recv():
yield i
BinningValue = Literal[1, 2]
Coordinate = Tuple[int, int]
@dataclass
class GigEConfig:
identifier: Optional[str] = None
binning_h: BinningValue = 1
binning_v: BinningValue = 1
pixel_format: int = neoapi.PixelFormat_BayerRG8
post_crop_tl: Optional[Coordinate] = None
post_crop_br: Optional[Coordinate] = None
@classmethod
def from_file(cls, file: PathLike):
with open(file, 'r') as fp:
return cls(**json.load(fp))
class GigE(VideoSource):
def __init__(self, config=GigEConfig):
self.config = config
self.camera = neoapi.Cam()
# self.camera.Connect('-B127')
self.camera.Connect(self.config.identifier)
# Default buffer mode, streaming, always returns latest frame
self.camera.SetImageBufferCount(10)
# neoAPI docs: Setting the neoapi.Cam.SetImageBufferCycleCount()to one ensures that all buffers but one are given back to the neoAPI to be re-cycled and never given to the user by the neoapi.Cam.GetImage() method.
self.camera.SetImageBufferCycleCount(1)
self.setPixelFormat(self.config.pixel_format)
if self.camera.IsConnected():
# self.camera.f.PixelFormat.Set(neoapi.PixelFormat_RGB8)
self.camera.f.BinningHorizontal.Set(self.config.binning_h)
self.camera.f.BinningVertical.Set(self.config.binning_v)
# print('exposure time', self.camera.f.ExposureAutoMaxValue.Set(20000)) # shutter 1/50
print('exposure time', self.camera.f.ExposureAutoMaxValue.Set(25000))
print('brightness targt', self.camera.f.BrightnessAutoNominalValue.Get())
print('brightness targt', self.camera.f.BrightnessAutoNominalValue.Set(30))
print('exposure time', self.camera.f.ExposureTime.Get())
print('Gamma', self.camera.f.Gamma.Set(0.39))
# print('LUT', self.camera.f.LUTIndex.Get())
# print('LUT', self.camera.f.LUTEnable.Get())
# print('exposure time max', self.camera.f.ExposureTimeGapMax.Get())
# print('exposure time min', self.camera.f.ExposureTimeGapMin.Get())
# self.pixfmt = self.camera.f.PixelFormat.Get()
def setPixelFormat(self, pixfmt):
self.pixfmt = pixfmt
self.camera.f.PixelFormat.Set(pixfmt)
# self.pixfmt = self.camera.f.PixelFormat.Get()
def recv(self):
while True:
if not self.camera.IsConnected():
return
i = self.camera.GetImage(0)
if i.IsEmpty():
time.sleep(.01)
continue
imgarray = i.GetNPArray()
if self.pixfmt == neoapi.PixelFormat_BayerRG12:
img = cv2.cvtColor(imgarray, cv2.COLOR_BayerRG2RGB)
elif self.pixfmt == neoapi.PixelFormat_BayerRG8:
img = cv2.cvtColor(imgarray, cv2.COLOR_BayerRG2RGB)
else:
img = cv2.cvtColor(imgarray, cv2.COLOR_BGR2RGB)
if img.dtype == np.uint16:
img = cv2.convertScaleAbs(img, alpha=(255.0/65535.0))
img = self._crop(img)
yield img
def _crop(self, img):
tl = self.config.post_crop_tl or (0,0)
br = self.config.post_crop_br or (img.shape[1], img.shape[0])
return img[tl[1]:br[1],tl[0]:br[0],:]
class SingleCvVideoSource(VideoSource):
def recv(self):
while True:
ret, img = self.video.read()
self.frame_idx+=1
# seek to 0 if video has finished. Infinite loop
if not ret:
# now loading multiple files
break
# frame = Frame(index=self.n, img=img, H=self.camera.H, camera=self.camera)
yield img
class RtspSource(SingleCvVideoSource):
def __init__(self, video_url: str | Path, camera: Camera = None):
gst = f"rtspsrc location={video_url} latency=0 buffer-mode=auto ! decodebin ! videoconvert ! appsink max-buffers=0 drop=true"
logger.info(f"Capture gstreamer (gst-launch-1.0): {gst}")
self.video = cv2.VideoCapture(gst, cv2.CAP_GSTREAMER)
self.frame_idx = 0
class FilelistSource(SingleCvVideoSource):
def __init__(self, video_sources: Iterable[UrlOrPath], camera: Camera = None, delay = True, offset = 0, end: Optional[int] = None, loop=False):
# store current position
self.video_sources = video_sources if not loop else cycle(video_sources)
self.camera = camera
self.video_path = None
self.video_nr = None
self.frame_count = None
self.frame_idx = None
self.n = 0
self.delay_generation = delay
self.offset = offset
self.end = end
def recv(self):
prev_time = time.time()
for video_nr, video_path in enumerate(self.video_sources):
self.video_path = video_path
self.video_nr = video_nr
logger.info(f"Play from '{str(video_path)}'")
video = cv2.VideoCapture(str(video_path))
fps = video.get(cv2.CAP_PROP_FPS)
target_frame_duration = 1./fps
self.frame_count = video.get(cv2.CAP_PROP_FRAME_COUNT)
if self.frame_count < 0:
self.frame_count = math.inf
self.frame_idx = 0
# TODO)) Video offset
if self.offset:
logger.info(f"Start at frame {self.offset}")
video.set(cv2.CAP_PROP_POS_FRAMES, self.offset)
self.frame_idx = self.offset
while True:
ret, img = video.read()
self.frame_idx+=1
self.n+=1
# seek to 0 if video has finished. Infinite loop
if not ret:
# now loading multiple files
break
if "DATASETS/hof/" in str(video_path):
# hack to mask out area
cv2.rectangle(img, (0,0), (800,200), (0,0,0), -1)
# frame = Frame(index=self.n, img=img, H=self.camera.H, camera=self.camera)
yield img
if self.end is not None and self.frame_idx >= self.end:
logger.info(f"Reached frame {self.end}")
break
if self.delay_generation:
# defer next loop
now = time.time()
time_diff = (now - prev_time)
if time_diff < target_frame_duration:
time.sleep(target_frame_duration - time_diff)
now += target_frame_duration - time_diff
prev_time = now
class CameraSource(SingleCvVideoSource):
def __init__(self, identifier: int, camera: Camera):
self.video = cv2.VideoCapture(identifier)
self.camera = camera
# TODO: make config variables
self.video.set(cv2.CAP_PROP_FRAME_WIDTH, int(self.camera.w))
self.video.set(cv2.CAP_PROP_FRAME_HEIGHT, int(self.camera.h))
# print("exposure!", video.get(cv2.CAP_PROP_AUTO_EXPOSURE))
self.video.set(cv2.CAP_PROP_FPS, self.camera.fps)
self.frame_idx = 0
def get_video_source(video_sources: List[UrlOrPath], camera: Camera, frame_offset=0, frame_end:Optional[int]=None, loop=False):
if str(video_sources[0]).isdigit():
# numeric input is a CV camera
if frame_offset:
logger.info("video-offset ignored for camera source")
return CameraSource(int(str(video_sources[0])), camera)
elif video_sources[0].url.scheme == 'rtsp':
# video_sources[0].url.hostname
if frame_offset:
logger.info("video-offset ignored for rtsp source")
return RtspSource(video_sources[0])
elif video_sources[0].url.scheme == 'gige':
if frame_offset:
logger.info("video-offset ignored for gige source")
config = GigEConfig.from_file(Path(video_sources[0].url.netloc + video_sources[0].url.path))
return GigE(config)
else:
return FilelistSource(video_sources, offset = frame_offset, end=frame_end, loop=loop)
# os.environ["OPENCV_FFMPEG_CAPTURE_OPTIONS"] = "fflags;nobuffer|flags;low_delay|avioflags;direct|rtsp_transport;udp"