camera_latency/latency_measure.py

220 lines
No EOL
6.8 KiB
Python

"""
Adapted from:
More info at www.makehardware.com/webcam-latency
From: https://github.com/perrytsao/Webcam-Latency-Measurement/blob/master/Camera_latency_fps_measure.py
"""
from collections import deque
from multiprocessing import Process, Queue
import queue
import time
import timeit
import numpy as np
import pyglet
import cv2
import neoapi
# TODO make dev dependency
# or switch to cv2.QRCodeEncoder
import qrcode
video_sprite = None
qr_sprite = None
video_fps = deque([], maxlen=20)
latencies = deque([], maxlen=10)
qr_generation_latency = deque([], maxlen=20)
# QR decoder and generator
qrDecoder = cv2.QRCodeDetector()
qrEncoder = qrcode.QRCode() # or use cv2.QRCodeEncoder()
qrEncoder.border = 4
class Source:
pass
class GigE(Source):
def __init__(self):
self.camera = neoapi.Cam()
self.camera.Connect()
# Default buffer mode, streaming, always returns latest frame
self.camera.SetImageBufferCount(2)
# neoAPI docs: Setting the neoapi.Cam.SetImageBufferCycleCount()to one ensures that all buffers but one are given back to the neoAPI to be re-cycled and never given to the user by the neoapi.Cam.GetImage() method.
self.camera.SetImageBufferCycleCount(1)
if self.camera.IsConnected():
self.camera.f.PixelFormat.Set(neoapi.PixelFormat_RGB8)
self.camera.f.BinningHorizontal.Set(2)
self.camera.f.BinningVertical.Set(2)
self.pixfmt = self.camera.f.PixelFormat.Get()
self._last_timestamp = None
def recv(self):
i = self.camera.GetImage(0)
# check that we're not getting an older image from the buffer
# because buffer is LIFO
ts = i.GetTimestamp()
if self._last_timestamp is not None and self._last_timestamp > ts:
return None
self._last_timestamp = ts
if i.IsEmpty():
return None
imgarray = i.GetNPArray()
if self.pixfmt == neoapi.PixelFormat_BayerBG12:
img = cv2.cvtColor(imgarray, cv2.COLOR_BayerRG2RGB)
else:
img = cv2.cvtColor(imgarray, cv2.COLOR_BGR2RGB)
if img.dtype == np.uint16:
img = cv2.convertScaleAbs(img, alpha=(255.0/65535.0))
return img
def qr_detector(frame_q: Queue, intervals_q: Queue):
while True:
now, img_for_qr = frame_q.get()
code,bbox,rectifiedImage = qrDecoder.detectAndDecode(img_for_qr)
if len(code) > 0:
detected_t = float(code)
try:
intervals_q.put_nowait(now-detected_t)
except queue.Full as e:
pass
# latencies.append(now-detected_t)
source = GigE()
# pass frames to QR detector process
frame_q = Queue(2)
# pass detected intervals back from detector process
intervals_q = Queue(20)
config = pyglet.gl.Config(sample_buffers=1, samples=4)
display = pyglet.display.get_display()
screen = display.get_screens()[0]
print(screen)
window = pyglet.window.Window(width=screen.width, height=screen.height, config=config, fullscreen=True, screen=screen)
# window.set_handler('on_close', self.on_close)
# window.set_handler('on_key_press', self.on_key_press)
fps_display = pyglet.window.FPSDisplay(window=window, color=(255,0,0))
video_fps_display = pyglet.text.Label("...", 100,10, color=(255,0,0))
now_label = pyglet.text.Label("...", 500,10, color=(255,0,0))
latencies_label = pyglet.text.Label("...", 700,10, color=(255,0,0))
def check_frames(dt: float):
global video_sprite
# print('dt', dt)
img = source.recv()
if img is None:
return
now = timeit.default_timer()
video_fps.append(now)
img = cv2.flip(cv2.cvtColor(img, cv2.COLOR_BGR2RGB), 0)
# TODO: offload to queue and multiprocessing
# img_for_qr = cv2.resize(img, (0,0), fx=.1, fy=.1)
try:
frame_q.put_nowait((now, img))
except queue.Full as e:
pass
img_data = pyglet.image.ImageData(img.shape[1], img.shape[0], 'RGB', img.tobytes())
# don't draw in batch, so that it is the background
video_sprite = pyglet.sprite.Sprite(img=img_data)
def on_refresh(dt):
global qr_sprite
if len(video_fps) < 2:
return
if qr_sprite is not None:
qr_sprite.delete() # clear texture from memory
try:
latency = intervals_q.get_nowait()
latencies.append(latency)
except queue.Empty as e:
pass
intervals = [video_fps[i] - video_fps[i-1] for i in range(1, len(video_fps))]
# intervals = video_fps[1:] - video_fps[:-1]
a = np.average(intervals)
fps = 1/a
video_fps_display.text = f"video stream: {fps:.2f} fps"
if len(latencies):
latencies_label.text = f"latency: {np.average(latencies):.4f} s" # roundtrip time between render and capture
# if qr_sprite is not None:
# return
qrEncoder.clear()
t_one = time.perf_counter()
qr_correction = np.average(qr_generation_latency) if len(qr_generation_latency) else 0
qr_corrected_time = t_one + qr_correction
qrEncoder.add_data(f"{qr_corrected_time:.6f}")
qr_data = np.array(qrEncoder.get_matrix()) # bool
qr_channel = 255 - (qr_data * 255 ).astype(np.uint8) # 0,255
qr_img = np.dstack((qr_channel,qr_channel,qr_channel))
# print(qr_img)
qr_img = cv2.resize(qr_img, (0, 0), fx = 3, fy = 3, interpolation=cv2.INTER_NEAREST)
# qr_img_data = pyglet.image.ImageData(qr_img.shape[1], qr_img.shape[0], 'L', qr_channel)
rows, cols, channels = qr_img.shape
pitch=cols*channels
data = qr_img.ravel()
texture = (pyglet.gl.GLubyte * (rows * cols * channels)) (*data)
# following helpful folkes at SO how to turn numpy array into pyglet ImageData:
# https://stackoverflow.com/questions/3165379/how-to-display-a-numpy-array-with-pyglet/3165844#3165844
qr_img_data = pyglet.image.ImageData(cols, rows, 'RGB', texture, pitch=pitch)
qr_sprite = pyglet.sprite.Sprite(img=qr_img_data, x = 100, y = 100)
# TODO: Collect and add to start time to negate qr generation latency
qr_generation_latency.append(time.perf_counter() - t_one)
# TODO: change into QR or something, and immediately compare on the capture of the frame
# with the timer at that point
now_label.text = f"{timeit.default_timer():.4f}"
def on_draw():
# global video_sprite
window.clear()
if video_sprite:
video_sprite.draw()
if qr_sprite:
qr_sprite.draw()
fps_display.draw()
video_fps_display.draw()
latencies_label.draw()
now_label.draw()
window.set_handler('on_refresh', on_refresh)
window.set_handler('on_draw', on_draw)
try:
event_loop = pyglet.app.EventLoop()
pyglet.clock.schedule(check_frames)
Process(target=qr_detector, args=(frame_q, intervals_q), daemon=True).start()
event_loop.run()
finally:
window.close()