220 lines
No EOL
6.8 KiB
Python
220 lines
No EOL
6.8 KiB
Python
"""
|
|
Adapted from:
|
|
More info at www.makehardware.com/webcam-latency
|
|
From: https://github.com/perrytsao/Webcam-Latency-Measurement/blob/master/Camera_latency_fps_measure.py
|
|
"""
|
|
|
|
from collections import deque
|
|
from multiprocessing import Process, Queue
|
|
import queue
|
|
import time
|
|
import timeit
|
|
import numpy as np
|
|
import pyglet
|
|
import cv2
|
|
import neoapi
|
|
# TODO make dev dependency
|
|
# or switch to cv2.QRCodeEncoder
|
|
import qrcode
|
|
|
|
video_sprite = None
|
|
qr_sprite = None
|
|
|
|
video_fps = deque([], maxlen=20)
|
|
latencies = deque([], maxlen=10)
|
|
qr_generation_latency = deque([], maxlen=20)
|
|
|
|
# QR decoder and generator
|
|
qrDecoder = cv2.QRCodeDetector()
|
|
qrEncoder = qrcode.QRCode() # or use cv2.QRCodeEncoder()
|
|
qrEncoder.border = 4
|
|
|
|
|
|
class Source:
|
|
pass
|
|
|
|
class GigE(Source):
|
|
def __init__(self):
|
|
self.camera = neoapi.Cam()
|
|
self.camera.Connect()
|
|
# Default buffer mode, streaming, always returns latest frame
|
|
self.camera.SetImageBufferCount(2)
|
|
# neoAPI docs: Setting the neoapi.Cam.SetImageBufferCycleCount()to one ensures that all buffers but one are given back to the neoAPI to be re-cycled and never given to the user by the neoapi.Cam.GetImage() method.
|
|
self.camera.SetImageBufferCycleCount(1)
|
|
if self.camera.IsConnected():
|
|
self.camera.f.PixelFormat.Set(neoapi.PixelFormat_RGB8)
|
|
self.camera.f.BinningHorizontal.Set(2)
|
|
self.camera.f.BinningVertical.Set(2)
|
|
self.pixfmt = self.camera.f.PixelFormat.Get()
|
|
self._last_timestamp = None
|
|
|
|
def recv(self):
|
|
i = self.camera.GetImage(0)
|
|
|
|
# check that we're not getting an older image from the buffer
|
|
# because buffer is LIFO
|
|
ts = i.GetTimestamp()
|
|
if self._last_timestamp is not None and self._last_timestamp > ts:
|
|
return None
|
|
self._last_timestamp = ts
|
|
|
|
if i.IsEmpty():
|
|
return None
|
|
|
|
imgarray = i.GetNPArray()
|
|
if self.pixfmt == neoapi.PixelFormat_BayerBG12:
|
|
img = cv2.cvtColor(imgarray, cv2.COLOR_BayerRG2RGB)
|
|
else:
|
|
img = cv2.cvtColor(imgarray, cv2.COLOR_BGR2RGB)
|
|
|
|
if img.dtype == np.uint16:
|
|
img = cv2.convertScaleAbs(img, alpha=(255.0/65535.0))
|
|
return img
|
|
|
|
def qr_detector(frame_q: Queue, intervals_q: Queue):
|
|
while True:
|
|
now, img_for_qr = frame_q.get()
|
|
|
|
code,bbox,rectifiedImage = qrDecoder.detectAndDecode(img_for_qr)
|
|
if len(code) > 0:
|
|
detected_t = float(code)
|
|
try:
|
|
intervals_q.put_nowait(now-detected_t)
|
|
except queue.Full as e:
|
|
pass
|
|
# latencies.append(now-detected_t)
|
|
|
|
|
|
source = GigE()
|
|
|
|
# pass frames to QR detector process
|
|
frame_q = Queue(2)
|
|
# pass detected intervals back from detector process
|
|
intervals_q = Queue(20)
|
|
|
|
|
|
|
|
config = pyglet.gl.Config(sample_buffers=1, samples=4)
|
|
|
|
display = pyglet.display.get_display()
|
|
screen = display.get_screens()[0]
|
|
print(screen)
|
|
window = pyglet.window.Window(width=screen.width, height=screen.height, config=config, fullscreen=True, screen=screen)
|
|
# window.set_handler('on_close', self.on_close)
|
|
# window.set_handler('on_key_press', self.on_key_press)
|
|
|
|
fps_display = pyglet.window.FPSDisplay(window=window, color=(255,0,0))
|
|
video_fps_display = pyglet.text.Label("...", 100,10, color=(255,0,0))
|
|
now_label = pyglet.text.Label("...", 500,10, color=(255,0,0))
|
|
latencies_label = pyglet.text.Label("...", 700,10, color=(255,0,0))
|
|
|
|
def check_frames(dt: float):
|
|
global video_sprite
|
|
# print('dt', dt)
|
|
img = source.recv()
|
|
if img is None:
|
|
return
|
|
|
|
|
|
now = timeit.default_timer()
|
|
video_fps.append(now)
|
|
img = cv2.flip(cv2.cvtColor(img, cv2.COLOR_BGR2RGB), 0)
|
|
|
|
# TODO: offload to queue and multiprocessing
|
|
# img_for_qr = cv2.resize(img, (0,0), fx=.1, fy=.1)
|
|
try:
|
|
frame_q.put_nowait((now, img))
|
|
except queue.Full as e:
|
|
pass
|
|
|
|
|
|
|
|
img_data = pyglet.image.ImageData(img.shape[1], img.shape[0], 'RGB', img.tobytes())
|
|
# don't draw in batch, so that it is the background
|
|
video_sprite = pyglet.sprite.Sprite(img=img_data)
|
|
|
|
|
|
def on_refresh(dt):
|
|
global qr_sprite
|
|
|
|
if len(video_fps) < 2:
|
|
return
|
|
|
|
if qr_sprite is not None:
|
|
qr_sprite.delete() # clear texture from memory
|
|
|
|
try:
|
|
latency = intervals_q.get_nowait()
|
|
latencies.append(latency)
|
|
except queue.Empty as e:
|
|
pass
|
|
|
|
intervals = [video_fps[i] - video_fps[i-1] for i in range(1, len(video_fps))]
|
|
|
|
# intervals = video_fps[1:] - video_fps[:-1]
|
|
a = np.average(intervals)
|
|
fps = 1/a
|
|
video_fps_display.text = f"video stream: {fps:.2f} fps"
|
|
if len(latencies):
|
|
latencies_label.text = f"latency: {np.average(latencies):.4f} s" # roundtrip time between render and capture
|
|
|
|
# if qr_sprite is not None:
|
|
# return
|
|
|
|
|
|
|
|
qrEncoder.clear()
|
|
t_one = time.perf_counter()
|
|
qr_correction = np.average(qr_generation_latency) if len(qr_generation_latency) else 0
|
|
qr_corrected_time = t_one + qr_correction
|
|
qrEncoder.add_data(f"{qr_corrected_time:.6f}")
|
|
qr_data = np.array(qrEncoder.get_matrix()) # bool
|
|
qr_channel = 255 - (qr_data * 255 ).astype(np.uint8) # 0,255
|
|
qr_img = np.dstack((qr_channel,qr_channel,qr_channel))
|
|
# print(qr_img)
|
|
qr_img = cv2.resize(qr_img, (0, 0), fx = 3, fy = 3, interpolation=cv2.INTER_NEAREST)
|
|
|
|
# qr_img_data = pyglet.image.ImageData(qr_img.shape[1], qr_img.shape[0], 'L', qr_channel)
|
|
rows, cols, channels = qr_img.shape
|
|
pitch=cols*channels
|
|
data = qr_img.ravel()
|
|
texture = (pyglet.gl.GLubyte * (rows * cols * channels)) (*data)
|
|
# following helpful folkes at SO how to turn numpy array into pyglet ImageData:
|
|
# https://stackoverflow.com/questions/3165379/how-to-display-a-numpy-array-with-pyglet/3165844#3165844
|
|
qr_img_data = pyglet.image.ImageData(cols, rows, 'RGB', texture, pitch=pitch)
|
|
|
|
qr_sprite = pyglet.sprite.Sprite(img=qr_img_data, x = 100, y = 100)
|
|
|
|
# TODO: Collect and add to start time to negate qr generation latency
|
|
qr_generation_latency.append(time.perf_counter() - t_one)
|
|
|
|
|
|
# TODO: change into QR or something, and immediately compare on the capture of the frame
|
|
# with the timer at that point
|
|
now_label.text = f"{timeit.default_timer():.4f}"
|
|
|
|
|
|
def on_draw():
|
|
# global video_sprite
|
|
window.clear()
|
|
if video_sprite:
|
|
video_sprite.draw()
|
|
if qr_sprite:
|
|
qr_sprite.draw()
|
|
|
|
fps_display.draw()
|
|
video_fps_display.draw()
|
|
latencies_label.draw()
|
|
now_label.draw()
|
|
|
|
window.set_handler('on_refresh', on_refresh)
|
|
window.set_handler('on_draw', on_draw)
|
|
|
|
|
|
try:
|
|
event_loop = pyglet.app.EventLoop()
|
|
pyglet.clock.schedule(check_frames)
|
|
Process(target=qr_detector, args=(frame_q, intervals_q), daemon=True).start()
|
|
event_loop.run()
|
|
finally:
|
|
window.close() |