Fist laser test night

This commit is contained in:
Ruben van de Ven 2025-03-18 18:38:58 +01:00
parent e1923569ef
commit 2ade58549f
7 changed files with 601 additions and 64 deletions

45
poetry.lock generated
View file

@ -1012,6 +1012,27 @@ qtconsole = ["qtconsole"]
test = ["packaging", "pickleshare", "pytest", "pytest-asyncio (<0.22)", "testpath"]
test-extra = ["curio", "ipython[test]", "matplotlib (!=3.2.0)", "nbformat", "numpy (>=1.23)", "pandas", "trio"]
[[package]]
name = "ipywidgets"
version = "8.1.5"
description = "Jupyter interactive widgets"
optional = false
python-versions = ">=3.7"
files = [
{file = "ipywidgets-8.1.5-py3-none-any.whl", hash = "sha256:3290f526f87ae6e77655555baba4f36681c555b8bdbbff430b70e52c34c86245"},
{file = "ipywidgets-8.1.5.tar.gz", hash = "sha256:870e43b1a35656a80c18c9503bbf2d16802db1cb487eec6fab27d683381dde17"},
]
[package.dependencies]
comm = ">=0.1.3"
ipython = ">=6.1.0"
jupyterlab-widgets = ">=3.0.12,<3.1.0"
traitlets = ">=4.3.1"
widgetsnbextension = ">=4.0.12,<4.1.0"
[package.extras]
test = ["ipykernel", "jsonschema", "pytest (>=3.6.0)", "pytest-cov", "pytz"]
[[package]]
name = "isoduration"
version = "20.11.0"
@ -1362,6 +1383,17 @@ docs = ["autodoc-traits", "jinja2 (<3.2.0)", "mistune (<4)", "myst-parser", "pyd
openapi = ["openapi-core (>=0.18.0,<0.19.0)", "ruamel-yaml"]
test = ["hatch", "ipykernel", "openapi-core (>=0.18.0,<0.19.0)", "openapi-spec-validator (>=0.6.0,<0.8.0)", "pytest (>=7.0,<8)", "pytest-console-scripts", "pytest-cov", "pytest-jupyter[server] (>=0.6.2)", "pytest-timeout", "requests-mock", "ruamel-yaml", "sphinxcontrib-spelling", "strict-rfc3339", "werkzeug"]
[[package]]
name = "jupyterlab-widgets"
version = "3.0.13"
description = "Jupyter interactive widgets for JupyterLab"
optional = false
python-versions = ">=3.7"
files = [
{file = "jupyterlab_widgets-3.0.13-py3-none-any.whl", hash = "sha256:e3cda2c233ce144192f1e29914ad522b2f4c40e77214b0cc97377ca3d323db54"},
{file = "jupyterlab_widgets-3.0.13.tar.gz", hash = "sha256:a2966d385328c1942b683a8cd96b89b8dd82c8b8f81dda902bb2bc06d46f5bed"},
]
[[package]]
name = "kiwisolver"
version = "1.4.8"
@ -3932,7 +3964,18 @@ MarkupSafe = ">=2.1.1"
[package.extras]
watchdog = ["watchdog (>=2.3)"]
[[package]]
name = "widgetsnbextension"
version = "4.0.13"
description = "Jupyter interactive widgets for Jupyter Notebook"
optional = false
python-versions = ">=3.7"
files = [
{file = "widgetsnbextension-4.0.13-py3-none-any.whl", hash = "sha256:74b2692e8500525cc38c2b877236ba51d34541e6385eeed5aec15a70f88a6c71"},
{file = "widgetsnbextension-4.0.13.tar.gz", hash = "sha256:ffcb67bc9febd10234a362795f643927f4e0c05d9342c727b65d2384f8feacb6"},
]
[metadata]
lock-version = "2.0"
python-versions = "^3.10,<3.12,"
content-hash = "1327d61e8a4f6d1eee7a6bfd234edb71c7165b6c30903d49522afa52252ddcc2"
content-hash = "d491761d6cdd8952d860e353849f927d0bd951076a976ac0d5636bcecacf14d9"

View file

@ -47,6 +47,7 @@ shapely = "^1"
baumer-neoapi = {path = "../../Downloads/Baumer_neoAPI_1.4.1_lin_x86_64_python/wheel/baumer_neoapi-1.4.1-cp34.cp35.cp36.cp37.cp38.cp39.cp310.cp311.cp312-none-linux_x86_64.whl"}
qrcode = "^8.0"
pyusb = "^1.3.1"
ipywidgets = "^8.1.5"
[build-system]
requires = ["poetry-core"]

View file

@ -89,7 +89,7 @@ class FrameEmitter:
def emit_video(self, timer_counter):
i = 0
source = get_video_source(self.video_srcs, self.config.camera)
source = get_video_source(self.video_srcs, self.config.camera, int(self.config.video_offset))
for i, img in enumerate(source):
with timer_counter.get_lock():

View file

@ -582,7 +582,7 @@ class HeliosDAC():
if __name__ == "__main__":
a = HeliosDAC()
a.runQueueThread()
# a.runQueueThread()
# cal = a.generateText("hello World", 20,20,scale=10)
## print(cal)
@ -592,16 +592,28 @@ if __name__ == "__main__":
# a.newFrame(2000,cal)
# a.DoFrame()
# cal = a.generateText("hello World", 0, 0,scale=10)
# pps = 20000
# while(1):
# a.newFrame(pps,cal)
# a.DoFrame()
cal = a.loadILDfile("astroid.ild")
while(1):
for (t,n1,n2,c),f in cal:
print("playing %s,%s, %d" % (n1,n2,c))
a.newFrame(5000,f)
# cal = a.loadILDfile("ildatest.ild")
# while(1):
# for (t,n1,n2,c),f in cal:
# print("playing %s,%s, %d" % (n1,n2,c))
# a.newFrame(5000,f)
# a.DoFrame()
# a.plot(f)
pps = 200
while(1):
a.newFrame(pps,[HeliosPoint(0,200, c=(255,255,255)), #draw a square
HeliosPoint(200,200, c=(255,255,255)),
HeliosPoint(200,0, c=(255,255,255)),
HeliosPoint(0,0, c=(255,255,255))])
a.DoFrame()
# while(1):
## a.newFrame(1000,[HeliosPoint(16000,16000)])

View file

@ -0,0 +1,253 @@
# -*- coding: utf-8 -*-
"""
Example for using Helios DAC libraries in python (using C library with ctypes)
NB: If you haven't set up udev rules you need to use sudo to run the program for it to detect the DAC.
"""
from __future__ import annotations
import ctypes
import json
import math
from typing import Optional
import cv2
import numpy as np
def lerp(a: float, b: float, t: float) -> float:
"""Linear interpolate on the scale given by a to b, using t as the point on that scale.
Examples
--------
50 == lerp(0, 100, 0.5)
4.2 == lerp(1, 5, 0.8)
"""
return (1 - t) * a + t * b
class LaserFrame():
def __init__(self, paths: list[LaserPath]):
self.paths = paths
# def closest_path(cls, point, paths):
# distances = [min(p.last()-)]
# def optimise_paths_lazy(self, last_point = None):
# """Quick way to optimise order of paths
# last_point can be the ending point of previous frame.
# """
# ordered_paths = []
# if not last_point:
# ordered_paths.append(self.paths.pop(0))
# last_point = endpoint
# pass
def get_points_interpolated_by_distance(self, point_interval, last_point: Optional[LaserPoint] = None) -> list[LaserPoint]:
"""
Interpolate the gaps between paths (NOT THE PATHS THEMSELVES)
point_interval is the maximum interval at which a new point should be added
"""
points: list[LaserPoint] = []
for path in self.paths:
if last_point:
a = last_point
b = path.first()
dx = b.x - a.x
dy = b.y - a.y
distance = np.linalg.norm([dx,dy])
steps = int(distance // point_interval)
for step in range(steps+1): # have both 0 and 1 in the lerp for empty points
t = step/(steps+1)
x = int(lerp(a.x, b.x, t))
y = int(lerp(a.y, b.y, t))
points.append(LaserPoint(x,y, (0,0,0), 0, True))
# print('append', steps)
points.extend(path.points)
last_point = path.last()
return points
class LaserPath():
def __init__(self, points: list[LaserPoint] = []):
# if len(points) < 1:
# raise RuntimeError("LaserPath should have some points")
self.points = points
def last(self):
return self.points[-1]
def first(self):
return self.points[0]
class LaserPoint():
def __init__(self,x,y,c: Color = (255,0,0),i= 255,blank=False):
self.x = x
self.y = y
self.c = c
self._i = i
self.blank = blank
@property
def color(self):
if self.blank: return (0,0,0)
return self.c
@property
def i(self):
return 0 if self.blank else self._i
def circle_points(cx, cy, r, c: Color):
# r = 100
steps = r
pointlist: list[LaserPoint] = []
for i in range(steps):
x = int(cx + math.cos(i * (2*math.pi)/steps) * r)
y = int(cy + math.sin(i * (2*math.pi)/steps)* r)
pointlist.append(LaserPoint(x, y, c, blank=(i==(steps-1)or i==0)))
return pointlist
def cross_points(cx, cy, r, c: Color):
# r = 100
steps = r
pointlist: list[LaserPoint] = []
for i in range(steps):
x = int(cx)
y = int(cy + r - i * 2 * r/steps)
pointlist.append(LaserPoint(x, y, c, blank=(i==(steps-1)or i==0)))
path = LaserPath(pointlist)
pointlist: list[LaserPoint] = []
for i in range(steps):
y = int(cy)
x = int(cx + r - i * 2 * r/steps)
pointlist.append(LaserPoint(x, y, c, blank=(i==(steps-1)or i==0)))
path2 = LaserPath(pointlist)
return [path, path2]
Color = tuple[int, int, int]
#Define point structure
class HeliosPoint(ctypes.Structure):
#_pack_=1
_fields_ = [('x', ctypes.c_uint16),
('y', ctypes.c_uint16),
('r', ctypes.c_uint8),
('g', ctypes.c_uint8),
('b', ctypes.c_uint8),
('i', ctypes.c_uint8)]
#Load and initialize library
HeliosLib = ctypes.cdll.LoadLibrary("./libHeliosDacAPI.so")
numDevices = HeliosLib.OpenDevices()
print("Found ", numDevices, "Helios DACs")
# #Create sample frames
# frames = [0 for x in range(100)]
# frameType = HeliosPoint * 1000
# x = 0
# y = 0
# for i in range(100):
# y = round(i * 0xFFF / 100)
# # y = round(50*0xFFF/100)
# frames[i] = frameType()
# for j in range(1000):
# if (j < 500):
# x = round(j * 0xFFF / 500)
# offset = 0
# else:
# offset = 0
# x = round(0xFFF - ((j - 500) * 0xFFF / 500))
# # frames[i][j] = HeliosPoint(int(x),int(y+offset),0,(x%155),0,255)
# frames[i][j] = HeliosPoint(int(x),int(y+offset),0,100,0,255)
pct =0xfff/100
r=50
# TODO)) scriptje met sliders
paths = [
# LaserPath(circle_points(10*pct, 45*pct, r, (100,0,100))),
# *cross_points(10*pct, 45*pct, r, (100,0,100)), # magenta
*cross_points(13.7*pct, 38.9*pct, r, (100,0,100)), # magenta # punt 10
*cross_points(44.3*pct, 47.0*pct, r, (0,100,0)), # groen # punt 0
*cross_points(82.5*pct, 12.7*pct, r, (100,100,100)), # wit # punt 4
*cross_points(89*pct, 49*pct, r, (0,100,100)), # cyan # punt 2
*cross_points(36*pct, 81.7*pct, r, (100,100,0)), # geel # punt 7
]
calibration_points = [
(13.7*pct, 38.9*pct, 10,),
(44.3*pct, 47.0*pct, 0),
(82.5*pct, 12.7*pct, 4),
(89*pct, 49*pct, 2),
(36*pct, 81.7*pct, 7),
]
with open('/home/ruben/suspicion/DATASETS/hof3/irl_points.json') as fp:
irl_points = json.load(fp)
src_points = []
dst_points=[]
for x, y, index in calibration_points:
src_points.append(irl_points[index])
dst_points.append([x,y])
print(src_points)
H, status = cv2.findHomography(np.array(src_points), np.array(dst_points))
print("LASER HOMOGRAPHY MATRIX")
print(H)
dst_img_points = cv2.perspectiveTransform(np.array([[irl_points[1]]]), H)
print(dst_img_points)
paths.extend([
*cross_points(dst_img_points[0][0][0], dst_img_points[0][0][1], r, (100,100,0)), # geel # punt 7
])
frame = LaserFrame(paths)
pointlist = frame.get_points_interpolated_by_distance(3)
print(len(pointlist))
#Play frames on DAC
i=0
while True:
frameType = HeliosPoint * len(pointlist)
frame = frameType()
# print(len(pointlist), last_laser_point.x, last_laser_point.y)
for j, point in enumerate(pointlist):
frame[j] = HeliosPoint(point.x, point.y, point.color[0],point.color[1], point.color[2], point.i)
# Make 512 attempts for DAC status to be ready. After that, just give up and try to write the frame anyway
statusAttempts=0
while (statusAttempts < 512 and HeliosLib.GetStatus(0) != 1):
statusAttempts += 1
HeliosLib.WriteFrame(0, 50000, 0, ctypes.pointer(frame), len(pointlist))
# for i in range(250):
# i+=1
# for j in range(numDevices):
# statusAttempts = 0
# # Make 512 attempts for DAC status to be ready. After that, just give up and try to write the frame anyway
# while (statusAttempts < 512 and HeliosLib.GetStatus(j) != 1):
# statusAttempts += 1
# HeliosLib.WriteFrame(j, 50000, 0, ctypes.pointer(frames[i % 100]), 1000) #Send the frame
HeliosLib.CloseDevices()

View file

@ -25,15 +25,107 @@ from pyglet import shapes
from PIL import Image
from trap.frame_emitter import DetectionState, Frame, Track, Camera
from trap.helios import HeliosDAC, HeliosPoint
# from trap.helios import HeliosDAC, HeliosPoint
from trap.preview_renderer import FrameWriter
from trap.tools import draw_track, draw_track_predictions, draw_track_projected, draw_trackjectron_history, to_point, track_predictions_to_lines
from trap.utils import convert_world_points_to_img_points, convert_world_space_to_img_space
from trap.utils import convert_world_points_to_img_points, convert_world_space_to_img_space, lerp
logger = logging.getLogger("trap.laser_renderer")
import ctypes
class LaserFrame():
def __init__(self, paths: list[LaserPath]):
self.paths = paths
# def closest_path(cls, point, paths):
# distances = [min(p.last()-)]
# def optimise_paths_lazy(self, last_point = None):
# """Quick way to optimise order of paths
# last_point can be the ending point of previous frame.
# """
# ordered_paths = []
# if not last_point:
# ordered_paths.append(self.paths.pop(0))
# last_point = endpoint
# pass
def get_points_interpolated_by_distance(self, point_interval, last_point: Optional[LaserPoint] = None) -> list[LaserPoint]:
"""
Interpolate the gaps between paths (NOT THE PATHS THEMSELVES)
point_interval is the maximum interval at which a new point should be added
"""
points: list[LaserPoint] = []
for path in self.paths:
if last_point:
a = last_point
b = path.first()
dx = b.x - a.x
dy = b.y - a.y
distance = np.linalg.norm([dx,dy])
steps = int(distance // point_interval)
for step in range(steps+1): # have both 0 and 1 in the lerp for empty points
t = step/(steps+1)
x = int(lerp(a.x, b.x, t))
y = int(lerp(a.y, b.y, t))
points.append(LaserPoint(x,y, (0,0,0), 0, True))
# print('append', steps)
points.extend(path.points)
last_point = path.last()
return points
class LaserPath():
def __init__(self, points: list[LaserPoint] = []):
# if len(points) < 1:
# raise RuntimeError("LaserPath should have some points")
self.points = points
def last(self):
return self.points[-1]
def first(self):
return self.points[0]
class LaserPoint():
def __init__(self,x,y,c: Color = (255,0,0),i= 255,blank=False):
self.x = x
self.y = y
self.c = c
self._i = i
self.blank = blank
@property
def color(self):
if self.blank: return (0,0,0)
return self.c
@property
def i(self):
return 0 if self.blank else self._i
#Define point structure
class CHeliosPoint(ctypes.Structure):
#_pack_=1
_fields_ = [('x', ctypes.c_uint16),
('y', ctypes.c_uint16),
('r', ctypes.c_uint8),
('g', ctypes.c_uint8),
('b', ctypes.c_uint8),
('i', ctypes.c_uint8)]
class LaserRenderer:
def __init__(self, config: Namespace, is_running: BaseEvent):
self.config = config
@ -68,13 +160,17 @@ class LaserRenderer:
self.tracks: Dict[str, Track] = {}
self.predictions: Dict[str, Track] = {}
self.helios = ctypes.cdll.LoadLibrary("./trap/helios_dac/libHeliosDacAPI.so")
numDevices = self.helios.OpenDevices()
logger.info(f"Found {numDevices} Helios DACs")
self.dac = HeliosDAC(debug=False)
logger.info(f"{self.dac.dev}")
logger.info(f"{self.dac.GetName()}")
logger.info(f"{self.dac.getHWVersion()}")
logger.info(f"Helios version: {self.dac.getHWVersion()}")
# self.dac = HeliosDAC(debug=False)
# logger.info(f"{self.dac.dev}")
# logger.info(f"{self.dac.GetName()}")
# logger.info(f"{self.dac.getHWVersion()}")
# logger.info(f"Helios version: {self.dac.getHWVersion()}")
# self.init_shapes()
@ -117,7 +213,48 @@ class LaserRenderer:
i=0
first_time = None
kpps = 50000
# frames = [0 for x in range(30)]
# frameTr= CHeliosPoint(int(x),int(y),20,20,20,255)
# print(frames)
pointlist_test = []
# pointlist_test.append(HeliosPoint(10,10, blank=False))
for i in range(30):
if i < 15:
# y = int(i*0xfff/500)
y = int(i*10 + 0xfff/2)
else:
# y = int((15-i)*0xfff/500)
y = int((15-i)*10 + 0xfff/2)
pointlist_test.append(LaserPoint(int(0),0xfff-y, blank=False))
# pointlist_test.append(HeliosPoint(10,0xfff, blank=False))
# pointlist_test.append(HeliosPoint(8000,8000, blank=False))
# pointlist_test.append(HeliosPoint(8000,10, blank=False))
# pointlist_test.append(HeliosPoint(10,10, blank=True))
# frameType = CHeliosPoint * len(pointlist_test)
# frame = frameType()
# for j, point in enumerate(pointlist_test):
# frame[j] = CHeliosPoint(point.x, point.y, 0,40,0,0 if point.blank else 255)
print(f"RENDER DAC\n\n\n")
last_laser_point = None
# for i in range(150):
while self.is_running.is_set():
# Make 512 attempts for DAC status to be ready. After that, just give up and try to write the frame anyway
# statusAttempts=0
# while (statusAttempts < 512 and self.helios.GetStatus(0) != 1):
# statusAttempts += 1
# self.helios.WriteFrame(0, kpps, 0, ctypes.pointer(frame), len(pointlist))
# continue
i+=1
with timer_counter.get_lock():
timer_counter.value+=1
@ -138,17 +275,47 @@ class LaserRenderer:
except zmq.ZMQError as e:
logger.debug(f'reuse tracks')
if tracker_frame is None:
# might need to wait a few iterations before first frame comes available
time.sleep(.1)
continue
# if tracker_frame is None:
# # might need to wait a few iterations before first frame comes available
# time.sleep(.1)
# continue
if first_time is None:
if first_time is None and tracker_frame is not None:
first_time = tracker_frame.time
pointlist = render_frame_to_dac(self.dac, tracker_frame, prediction_frame, first_time, self.config, self.tracks, self.predictions, self.config.render_clusters)
self.dac.newFrame(50000, pointlist)
paths = render_frame_to_pointlist( tracker_frame, prediction_frame, first_time, self.config, self.tracks, self.predictions, self.config.render_clusters)
laserframe = LaserFrame(paths)
# pointlist=pointlist_test
# print([(p.x, p.y) for p in pointlist])
# pointlist.extend(pointlist_test)
pointlist = laserframe.get_points_interpolated_by_distance(2, last_laser_point)
# print(len(pointlist))
if len(pointlist):
last_laser_point = pointlist[-1]
frameType = CHeliosPoint * len(pointlist)
frame = frameType()
# print(len(pointlist), last_laser_point.x, last_laser_point.y)
for j, point in enumerate(pointlist):
frame[j] = CHeliosPoint(int(point.x), int(point.y), point.color[0],point.color[1], point.color[2], point.i)
# Make 512 attempts for DAC status to be ready. After that, just give up and try to write the frame anyway
statusAttempts=0
while (statusAttempts < 512 and self.helios.GetStatus(0) != 1):
statusAttempts += 1
self.helios.WriteFrame(0, kpps, 0, ctypes.pointer(frame), len(pointlist))
# continue
# self.helios.WriteFrame(0, kpps, 0, ctypes.pointer(frame), len(pointlist))
# self.dac.newFrame(50000, pointlist)
# clear out old tracks & predictions:
@ -162,6 +329,7 @@ class LaserRenderer:
self.predictions.pop(prediction_id)
logger.info('Stopping')
self.helios.CloseDevices()
# if i>2:
@ -186,22 +354,45 @@ colorset = [
# (0,0,0),
# ]
def get_animation_position(track: Track, current_frame: Frame):
fade_duration = current_frame.camera.fps * 3
def get_animation_position(track: Track, current_frame: Frame) -> float:
fade_duration = current_frame.camera.fps * 2
diff = current_frame.index - track.history[-1].frame_nr
return max(0, min(1, diff / fade_duration))
# track.history[-1].frame_nr < (current_frame.index - current_frame.camera.fps * 3)
# track.history[-1].frame_nr < (current_frame.index - current_frame.camera.fps * 3)
def circle_points(cx, cy, r, c: Color):
r = 100
steps = 100
pointlist: list[LaserPoint] = []
for i in range(steps):
x = int(cx + math.cos(i * (2*math.pi)/steps) * r)
y = int(cy + math.sin(i * (2*math.pi)/steps)* r)
pointlist.append(LaserPoint(x, y, c, blank=(i==(steps-1)or i==0)))
return pointlist
Color = tuple[int, int, int]
# derived with trap/helios_dac/calibration_points.py
# set points in the script to points from hof3/irl_points.json
laser_H =np.array([[ 2.47442963e+02, -7.01714050e+01, -9.71749119e+01],
[ 1.02328119e+01, 1.47185254e+02, 1.96295638e+02],
[-1.20921986e-03, -3.32735973e-02, 1.00000000e+00]])
def world_points_to_laser_points(points):
return cv2.perspectiveTransform(np.array([points]), laser_H)
# Deprecated
def render_frame_to_dac(dac: HeliosDAC, tracker_frame: Frame, prediction_frame: Frame, first_time: float, config: Namespace, tracks: Dict[str, Track], predictions: Dict[str, Track], as_clusters = True) -> np.array:
def render_frame_to_pointlist(tracker_frame: Optional[Frame], prediction_frame: Optional[Frame], first_time: Optional[float], config: Namespace, tracks: Dict[str, Track], predictions: Dict[str, Track], as_clusters = True):
# TODO: replace opencv with QPainter to support alpha? https://doc.qt.io/qtforpython-5/PySide2/QtGui/QPainter.html#PySide2.QtGui.PySide2.QtGui.QPainter.drawImage
# or https://github.com/pygobject/pycairo?tab=readme-ov-file
# or https://pyglet.readthedocs.io/en/latest/programming_guide/shapes.html
# and use http://code.astraw.com/projects/motmot/pygarrayimage.html or https://gist.github.com/nkymut/1cb40ea6ae4de0cf9ded7332f1ca0d55
# or https://api.arcade.academy/en/stable/index.html (supports gradient color in line -- "Arcade is built on top of Pyglet and OpenGL.")
pointlist = []
# pointlist: list[LaserPoint] = []
# frame = LaserFrame()
paths: list[LaserPath] = []
# pointlist.append(HeliosPoint(x,y, dac.palette[cindex],blank=blank))
@ -215,69 +406,99 @@ def render_frame_to_dac(dac: HeliosDAC, tracker_frame: Frame, prediction_frame:
# cv2.imwrite(str(self.config.output_dir / "orig.png"), warpedFrame)
# cv2.rectangle(img, (0,0), (img.shape[1],25), (0,0,0), -1)
c = dac.palette[4] # Green
pointlist.append(HeliosPoint(10,10, c,blank=False))
pointlist.append(HeliosPoint(10,100, c,blank=False))
pointlist.append(HeliosPoint(100,100, c,blank=False))
pointlist.append(HeliosPoint(100,10, c,blank=False))
pointlist.append(HeliosPoint(10,10, c,blank=True))
intensity = 100 # range 0-255
test_r = 100
base_c = (0,0, intensity)
track_c = (intensity,0,0)
pred_c = (0,intensity,0)
if not tracker_frame and not prediction_frame:
paths.append(
LaserPath(circle_points(0xFFF/2, 0xFFF/2, test_r, base_c))
)
# c = (0,intensity,0)#, dac.palette[4] # Green
# r = 100
# steps = 100
# for i in range(steps):
# x = int(0xFFF/2 + math.cos(i * (2*math.pi)/steps) * r)
# y = int(0xFFF/2 + math.sin(i * (2*math.pi)/steps)* r)
# pointlist.append(HeliosPoint(x, y, c, blank=i==99))
# pointlist.append(HeliosPoint(10,10, c,blank=False))
# pointlist.append(HeliosPoint(10,100, c,blank=False))
# pointlist.append(HeliosPoint(10,200, c,blank=False))
# pointlist.append(HeliosPoint(100,200, c,blank=False))
# pointlist.append(HeliosPoint(200,200, c,blank=False))
# pointlist.append(HeliosPoint(200,100, c,blank=False))
# pointlist.append(HeliosPoint(200,10, c,blank=False))
# pointlist.append(HeliosPoint(100,10, c,blank=False))
# pointlist.append(HeliosPoint(10,10, c,blank=True))
# return pointlist
# print(not tracker_frame, not prediction_frame)
if not tracker_frame:
c = dac.palette[3] # yellow
pointlist.append(HeliosPoint(110,10, c,blank=False))
pointlist.append(HeliosPoint(110,100, c,blank=False))
pointlist.append(HeliosPoint(200,100, c,blank=False))
pointlist.append(HeliosPoint(200,10, c,blank=False))
pointlist.append(HeliosPoint(110,10, c,blank=True))
c = (intensity, intensity, 0) #dac.palette[3] # yellow
paths.append(
LaserPath(circle_points(0xFFF/2+2*test_r, 0xFFF/2, test_r, track_c))
)
else:
for track_id, track in tracks.items():
inv_H = np.linalg.pinv(tracker_frame.H)
history = track.get_projected_history(camera=config.camera)
history = convert_world_points_to_img_points(history)
history = world_points_to_laser_points(history)[0]
# point_color = bgr_colors[color_index % len(bgr_colors)]
points = np.rint(history.reshape((-1,1,2))).astype(np.int32)
# print('point len',len(points))
laserpoints = []
for i, point in enumerate(points):
blank = i+1 == len(points) # last point blank
pointlist.append(HeliosPoint(point[0][0], point[0][1], dac.palette[2], blank=blank))
laserpoints.append(LaserPoint(point[0][0], point[0][1], track_c, blank=False))
path = LaserPath(laserpoints)
paths.append(path)
# draw_track_projected(img, track, int(track_id), config.camera, convert_world_points_to_img_points)
if not prediction_frame:
c = dac.palette[7] # magenta
pointlist.append(HeliosPoint(210,10, c,blank=False))
pointlist.append(HeliosPoint(210,100, c,blank=False))
pointlist.append(HeliosPoint(300,100, c,blank=False))
pointlist.append(HeliosPoint(300,10, c,blank=False))
pointlist.append(HeliosPoint(210,10, c,blank=True))
c = (intensity,0,intensity) # dac.palette[7] # magenta
paths.append(
LaserPath(circle_points(0xFFF/2+4*test_r, 0xFFF/2, test_r, pred_c))
)
# cv2.putText(img, f"Waiting for prediction...", (500,17), cv2.FONT_HERSHEY_PLAIN, 1, (255,255,0), 1)
# continue
# elif True:
# pass
else:
for track_id, track in predictions.items():
inv_H = np.linalg.pinv(prediction_frame.H)
# For debugging:
# draw_trackjectron_history(img, track, int(track.track_id), convert_world_points_to_img_points)
anim_position = get_animation_position(track, tracker_frame)
anim_position = 1 # TODO)) calculate without video frame: get_animation_position(track, tracker_frame)
lines = track_predictions_to_lines(track, config.camera, anim_position)
if not lines:
continue
lines = [convert_world_points_to_img_points(points) for points in lines]
# cv2 only draws to integer coordinates
lines = [np.rint(points).astype(int) for points in lines]
# draw in a single pass
# line_points = line_points.reshape((1, -1,1,2))
for line in lines:
# print('prediction line')
line = world_points_to_laser_points(line)[0]
# line = convert_world_points_to_img_points(line)
line = np.rint(line).astype(int)
laserpoints = []
for i, point in enumerate(line):
blank = i+1 == len(points) # last point blank
pointlist.append(HeliosPoint(point[0], point[1], dac.palette[4], blank=blank))
laserpoints.append(LaserPoint(point[0], point[1], pred_c, blank=False))
path = LaserPath(laserpoints)
paths.append(path)
# draw_track_predictions(img, track, int(track.track_id)+1, config.camera, convert_world_points_to_img_points, anim_position=anim_position, as_clusters=as_clusters)
# cv2.putText(img, f"{len(track.predictor_history) if track.predictor_history else 'none'}", to_point(track.history[0].get_foot_coords()), cv2.FONT_HERSHEY_COMPLEX, 1, (255,255,255), 1)
return pointlist
# print(len(paths))
return paths
def run_laser_renderer(config: Namespace, is_running: BaseEvent, timer_counter):

View file

@ -84,7 +84,7 @@ class RtspSource(SingleCvVideoSource):
class FilelistSource(SingleCvVideoSource):
def __init__(self, video_sources: Iterable[UrlOrPath], camera: Camera = None, delay = True):
def __init__(self, video_sources: Iterable[UrlOrPath], camera: Camera = None, delay = True, offset = 0):
# store current position
self.video_sources = video_sources
self.camera = camera
@ -94,6 +94,7 @@ class FilelistSource(SingleCvVideoSource):
self.frame_idx = None
self.n = 0
self.delay_generation = delay
self.offset = offset
def recv(self):
prev_time = time.time()
@ -110,10 +111,10 @@ class FilelistSource(SingleCvVideoSource):
self.frame_count = math.inf
self.frame_idx = 0
# TODO)) Video offset
# if self.config.video_offset:
# logger.info(f"Start at frame {self.config.video_offset}")
# video.set(cv2.CAP_PROP_POS_FRAMES, self.config.video_offset)
# self.frame_idx = self.config.video_offset
if self.offset:
logger.info(f"Start at frame {self.offset}")
video.set(cv2.CAP_PROP_POS_FRAMES, self.offset)
self.frame_idx = self.offset
while True:
ret, img = video.read()
@ -155,16 +156,22 @@ class CameraSource(SingleCvVideoSource):
self.video.set(cv2.CAP_PROP_FPS, self.camera.fps)
self.frame_idx = 0
def get_video_source(video_sources: List[UrlOrPath], camera: Camera):
def get_video_source(video_sources: List[UrlOrPath], camera: Camera, frame_offset=0):
if str(video_sources[0]).isdigit():
# numeric input is a CV camera
if frame_offset:
logger.info("video-offset ignored for camera source")
return CameraSource(int(str(video_sources[0])), camera)
elif video_sources[0].url.scheme == 'rtsp':
video_sources[0].url.hostname
# video_sources[0].url.hostname
if frame_offset:
logger.info("video-offset ignored for rtsp source")
return RtspSource(video_sources[0])
elif video_sources[0].url.scheme == 'gige':
if frame_offset:
logger.info("video-offset ignored for gige source")
return GigE(video_sources[0].url.hostname)
else:
return FilelistSource(video_sources)
return FilelistSource(video_sources, offset = frame_offset)
# os.environ["OPENCV_FFMPEG_CAPTURE_OPTIONS"] = "fflags;nobuffer|flags;low_delay|avioflags;direct|rtsp_transport;udp"