diff --git a/poetry.lock b/poetry.lock index d294491..c09afef 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1012,6 +1012,27 @@ qtconsole = ["qtconsole"] test = ["packaging", "pickleshare", "pytest", "pytest-asyncio (<0.22)", "testpath"] test-extra = ["curio", "ipython[test]", "matplotlib (!=3.2.0)", "nbformat", "numpy (>=1.23)", "pandas", "trio"] +[[package]] +name = "ipywidgets" +version = "8.1.5" +description = "Jupyter interactive widgets" +optional = false +python-versions = ">=3.7" +files = [ + {file = "ipywidgets-8.1.5-py3-none-any.whl", hash = "sha256:3290f526f87ae6e77655555baba4f36681c555b8bdbbff430b70e52c34c86245"}, + {file = "ipywidgets-8.1.5.tar.gz", hash = "sha256:870e43b1a35656a80c18c9503bbf2d16802db1cb487eec6fab27d683381dde17"}, +] + +[package.dependencies] +comm = ">=0.1.3" +ipython = ">=6.1.0" +jupyterlab-widgets = ">=3.0.12,<3.1.0" +traitlets = ">=4.3.1" +widgetsnbextension = ">=4.0.12,<4.1.0" + +[package.extras] +test = ["ipykernel", "jsonschema", "pytest (>=3.6.0)", "pytest-cov", "pytz"] + [[package]] name = "isoduration" version = "20.11.0" @@ -1362,6 +1383,17 @@ docs = ["autodoc-traits", "jinja2 (<3.2.0)", "mistune (<4)", "myst-parser", "pyd openapi = ["openapi-core (>=0.18.0,<0.19.0)", "ruamel-yaml"] test = ["hatch", "ipykernel", "openapi-core (>=0.18.0,<0.19.0)", "openapi-spec-validator (>=0.6.0,<0.8.0)", "pytest (>=7.0,<8)", "pytest-console-scripts", "pytest-cov", "pytest-jupyter[server] (>=0.6.2)", "pytest-timeout", "requests-mock", "ruamel-yaml", "sphinxcontrib-spelling", "strict-rfc3339", "werkzeug"] +[[package]] +name = "jupyterlab-widgets" +version = "3.0.13" +description = "Jupyter interactive widgets for JupyterLab" +optional = false +python-versions = ">=3.7" +files = [ + {file = "jupyterlab_widgets-3.0.13-py3-none-any.whl", hash = "sha256:e3cda2c233ce144192f1e29914ad522b2f4c40e77214b0cc97377ca3d323db54"}, + {file = "jupyterlab_widgets-3.0.13.tar.gz", hash = "sha256:a2966d385328c1942b683a8cd96b89b8dd82c8b8f81dda902bb2bc06d46f5bed"}, +] + [[package]] name = "kiwisolver" version = "1.4.8" @@ -3932,7 +3964,18 @@ MarkupSafe = ">=2.1.1" [package.extras] watchdog = ["watchdog (>=2.3)"] +[[package]] +name = "widgetsnbextension" +version = "4.0.13" +description = "Jupyter interactive widgets for Jupyter Notebook" +optional = false +python-versions = ">=3.7" +files = [ + {file = "widgetsnbextension-4.0.13-py3-none-any.whl", hash = "sha256:74b2692e8500525cc38c2b877236ba51d34541e6385eeed5aec15a70f88a6c71"}, + {file = "widgetsnbextension-4.0.13.tar.gz", hash = "sha256:ffcb67bc9febd10234a362795f643927f4e0c05d9342c727b65d2384f8feacb6"}, +] + [metadata] lock-version = "2.0" python-versions = "^3.10,<3.12," -content-hash = "1327d61e8a4f6d1eee7a6bfd234edb71c7165b6c30903d49522afa52252ddcc2" +content-hash = "d491761d6cdd8952d860e353849f927d0bd951076a976ac0d5636bcecacf14d9" diff --git a/pyproject.toml b/pyproject.toml index a6add17..c7a4822 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -47,6 +47,7 @@ shapely = "^1" baumer-neoapi = {path = "../../Downloads/Baumer_neoAPI_1.4.1_lin_x86_64_python/wheel/baumer_neoapi-1.4.1-cp34.cp35.cp36.cp37.cp38.cp39.cp310.cp311.cp312-none-linux_x86_64.whl"} qrcode = "^8.0" pyusb = "^1.3.1" +ipywidgets = "^8.1.5" [build-system] requires = ["poetry-core"] diff --git a/trap/frame_emitter.py b/trap/frame_emitter.py index f214b53..dc935c1 100644 --- a/trap/frame_emitter.py +++ b/trap/frame_emitter.py @@ -89,7 +89,7 @@ class FrameEmitter: def emit_video(self, timer_counter): i = 0 - source = get_video_source(self.video_srcs, self.config.camera) + source = get_video_source(self.video_srcs, self.config.camera, int(self.config.video_offset)) for i, img in enumerate(source): with timer_counter.get_lock(): diff --git a/trap/helios.py b/trap/helios.py index 3f6e8b1..4383440 100644 --- a/trap/helios.py +++ b/trap/helios.py @@ -582,7 +582,7 @@ class HeliosDAC(): if __name__ == "__main__": a = HeliosDAC() - a.runQueueThread() + # a.runQueueThread() # cal = a.generateText("hello World", 20,20,scale=10) ## print(cal) @@ -592,16 +592,28 @@ if __name__ == "__main__": # a.newFrame(2000,cal) # a.DoFrame() + # cal = a.generateText("hello World", 0, 0,scale=10) + # pps = 20000 + # while(1): + # a.newFrame(pps,cal) + # a.DoFrame() - cal = a.loadILDfile("astroid.ild") - while(1): - for (t,n1,n2,c),f in cal: - print("playing %s,%s, %d" % (n1,n2,c)) - a.newFrame(5000,f) + + # cal = a.loadILDfile("ildatest.ild") + # while(1): + # for (t,n1,n2,c),f in cal: + # print("playing %s,%s, %d" % (n1,n2,c)) + # a.newFrame(5000,f) # a.DoFrame() # a.plot(f) - + pps = 200 + while(1): + a.newFrame(pps,[HeliosPoint(0,200, c=(255,255,255)), #draw a square + HeliosPoint(200,200, c=(255,255,255)), + HeliosPoint(200,0, c=(255,255,255)), + HeliosPoint(0,0, c=(255,255,255))]) + a.DoFrame() # while(1): ## a.newFrame(1000,[HeliosPoint(16000,16000)]) diff --git a/trap/helios_dac/calibration_points.py b/trap/helios_dac/calibration_points.py new file mode 100644 index 0000000..6bf6aab --- /dev/null +++ b/trap/helios_dac/calibration_points.py @@ -0,0 +1,253 @@ +# -*- coding: utf-8 -*- +""" +Example for using Helios DAC libraries in python (using C library with ctypes) + +NB: If you haven't set up udev rules you need to use sudo to run the program for it to detect the DAC. +""" +from __future__ import annotations + +import ctypes +import json +import math +from typing import Optional + +import cv2 +import numpy as np + +def lerp(a: float, b: float, t: float) -> float: + """Linear interpolate on the scale given by a to b, using t as the point on that scale. + Examples + -------- + 50 == lerp(0, 100, 0.5) + 4.2 == lerp(1, 5, 0.8) + """ + return (1 - t) * a + t * b + + +class LaserFrame(): + def __init__(self, paths: list[LaserPath]): + self.paths = paths + + # def closest_path(cls, point, paths): + # distances = [min(p.last()-)] + + # def optimise_paths_lazy(self, last_point = None): + # """Quick way to optimise order of paths + # last_point can be the ending point of previous frame. + # """ + # ordered_paths = [] + # if not last_point: + # ordered_paths.append(self.paths.pop(0)) + + # last_point = endpoint + # pass + + def get_points_interpolated_by_distance(self, point_interval, last_point: Optional[LaserPoint] = None) -> list[LaserPoint]: + """ + Interpolate the gaps between paths (NOT THE PATHS THEMSELVES) + point_interval is the maximum interval at which a new point should be added + """ + points: list[LaserPoint] = [] + for path in self.paths: + if last_point: + a = last_point + b = path.first() + dx = b.x - a.x + dy = b.y - a.y + distance = np.linalg.norm([dx,dy]) + steps = int(distance // point_interval) + for step in range(steps+1): # have both 0 and 1 in the lerp for empty points + t = step/(steps+1) + x = int(lerp(a.x, b.x, t)) + y = int(lerp(a.y, b.y, t)) + points.append(LaserPoint(x,y, (0,0,0), 0, True)) + # print('append', steps) + + points.extend(path.points) + + last_point = path.last() + + return points + + + + +class LaserPath(): + def __init__(self, points: list[LaserPoint] = []): + # if len(points) < 1: + # raise RuntimeError("LaserPath should have some points") + + self.points = points + + def last(self): + return self.points[-1] + + def first(self): + return self.points[0] + +class LaserPoint(): + def __init__(self,x,y,c: Color = (255,0,0),i= 255,blank=False): + self.x = x + self.y = y + self.c = c + self._i = i + self.blank = blank + + @property + def color(self): + if self.blank: return (0,0,0) + return self.c + + @property + def i(self): + return 0 if self.blank else self._i + +def circle_points(cx, cy, r, c: Color): + # r = 100 + steps = r + pointlist: list[LaserPoint] = [] + for i in range(steps): + x = int(cx + math.cos(i * (2*math.pi)/steps) * r) + y = int(cy + math.sin(i * (2*math.pi)/steps)* r) + pointlist.append(LaserPoint(x, y, c, blank=(i==(steps-1)or i==0))) + + return pointlist + +def cross_points(cx, cy, r, c: Color): + # r = 100 + steps = r + pointlist: list[LaserPoint] = [] + for i in range(steps): + x = int(cx) + y = int(cy + r - i * 2 * r/steps) + pointlist.append(LaserPoint(x, y, c, blank=(i==(steps-1)or i==0))) + path = LaserPath(pointlist) + pointlist: list[LaserPoint] = [] + for i in range(steps): + y = int(cy) + x = int(cx + r - i * 2 * r/steps) + pointlist.append(LaserPoint(x, y, c, blank=(i==(steps-1)or i==0))) + path2 = LaserPath(pointlist) + + return [path, path2] + +Color = tuple[int, int, int] + +#Define point structure +class HeliosPoint(ctypes.Structure): + #_pack_=1 + _fields_ = [('x', ctypes.c_uint16), + ('y', ctypes.c_uint16), + ('r', ctypes.c_uint8), + ('g', ctypes.c_uint8), + ('b', ctypes.c_uint8), + ('i', ctypes.c_uint8)] + +#Load and initialize library +HeliosLib = ctypes.cdll.LoadLibrary("./libHeliosDacAPI.so") +numDevices = HeliosLib.OpenDevices() +print("Found ", numDevices, "Helios DACs") + +# #Create sample frames +# frames = [0 for x in range(100)] +# frameType = HeliosPoint * 1000 +# x = 0 +# y = 0 +# for i in range(100): +# y = round(i * 0xFFF / 100) +# # y = round(50*0xFFF/100) +# frames[i] = frameType() +# for j in range(1000): +# if (j < 500): +# x = round(j * 0xFFF / 500) +# offset = 0 +# else: +# offset = 0 +# x = round(0xFFF - ((j - 500) * 0xFFF / 500)) + +# # frames[i][j] = HeliosPoint(int(x),int(y+offset),0,(x%155),0,255) +# frames[i][j] = HeliosPoint(int(x),int(y+offset),0,100,0,255) + +pct =0xfff/100 +r=50 + +# TODO)) scriptje met sliders + +paths = [ + # LaserPath(circle_points(10*pct, 45*pct, r, (100,0,100))), + # *cross_points(10*pct, 45*pct, r, (100,0,100)), # magenta + *cross_points(13.7*pct, 38.9*pct, r, (100,0,100)), # magenta # punt 10 + *cross_points(44.3*pct, 47.0*pct, r, (0,100,0)), # groen # punt 0 + *cross_points(82.5*pct, 12.7*pct, r, (100,100,100)), # wit # punt 4 + *cross_points(89*pct, 49*pct, r, (0,100,100)), # cyan # punt 2 + *cross_points(36*pct, 81.7*pct, r, (100,100,0)), # geel # punt 7 +] + +calibration_points = [ +(13.7*pct, 38.9*pct, 10,), +(44.3*pct, 47.0*pct, 0), +(82.5*pct, 12.7*pct, 4), +(89*pct, 49*pct, 2), +(36*pct, 81.7*pct, 7), +] + +with open('/home/ruben/suspicion/DATASETS/hof3/irl_points.json') as fp: + irl_points = json.load(fp) + +src_points = [] +dst_points=[] +for x, y, index in calibration_points: + src_points.append(irl_points[index]) + dst_points.append([x,y]) + +print(src_points) +H, status = cv2.findHomography(np.array(src_points), np.array(dst_points)) +print("LASER HOMOGRAPHY MATRIX") +print(H) +dst_img_points = cv2.perspectiveTransform(np.array([[irl_points[1]]]), H) +print(dst_img_points) + + +paths.extend([ + *cross_points(dst_img_points[0][0][0], dst_img_points[0][0][1], r, (100,100,0)), # geel # punt 7 +]) + + +frame = LaserFrame(paths) + +pointlist = frame.get_points_interpolated_by_distance(3) + + +print(len(pointlist)) +#Play frames on DAC +i=0 +while True: + + + frameType = HeliosPoint * len(pointlist) + frame = frameType() + + # print(len(pointlist), last_laser_point.x, last_laser_point.y) + + for j, point in enumerate(pointlist): + frame[j] = HeliosPoint(point.x, point.y, point.color[0],point.color[1], point.color[2], point.i) + + # Make 512 attempts for DAC status to be ready. After that, just give up and try to write the frame anyway + statusAttempts=0 + + while (statusAttempts < 512 and HeliosLib.GetStatus(0) != 1): + statusAttempts += 1 + + HeliosLib.WriteFrame(0, 50000, 0, ctypes.pointer(frame), len(pointlist)) + +# for i in range(250): + # i+=1 + # for j in range(numDevices): + # statusAttempts = 0 + # # Make 512 attempts for DAC status to be ready. After that, just give up and try to write the frame anyway + # while (statusAttempts < 512 and HeliosLib.GetStatus(j) != 1): + # statusAttempts += 1 + # HeliosLib.WriteFrame(j, 50000, 0, ctypes.pointer(frames[i % 100]), 1000) #Send the frame + + +HeliosLib.CloseDevices() diff --git a/trap/laser_renderer.py b/trap/laser_renderer.py index 1461239..aae43a6 100644 --- a/trap/laser_renderer.py +++ b/trap/laser_renderer.py @@ -25,15 +25,107 @@ from pyglet import shapes from PIL import Image from trap.frame_emitter import DetectionState, Frame, Track, Camera -from trap.helios import HeliosDAC, HeliosPoint +# from trap.helios import HeliosDAC, HeliosPoint from trap.preview_renderer import FrameWriter from trap.tools import draw_track, draw_track_predictions, draw_track_projected, draw_trackjectron_history, to_point, track_predictions_to_lines -from trap.utils import convert_world_points_to_img_points, convert_world_space_to_img_space +from trap.utils import convert_world_points_to_img_points, convert_world_space_to_img_space, lerp logger = logging.getLogger("trap.laser_renderer") +import ctypes + +class LaserFrame(): + def __init__(self, paths: list[LaserPath]): + self.paths = paths + + # def closest_path(cls, point, paths): + # distances = [min(p.last()-)] + + # def optimise_paths_lazy(self, last_point = None): + # """Quick way to optimise order of paths + # last_point can be the ending point of previous frame. + # """ + # ordered_paths = [] + # if not last_point: + # ordered_paths.append(self.paths.pop(0)) + + # last_point = endpoint + # pass + + def get_points_interpolated_by_distance(self, point_interval, last_point: Optional[LaserPoint] = None) -> list[LaserPoint]: + """ + Interpolate the gaps between paths (NOT THE PATHS THEMSELVES) + point_interval is the maximum interval at which a new point should be added + """ + points: list[LaserPoint] = [] + for path in self.paths: + if last_point: + a = last_point + b = path.first() + dx = b.x - a.x + dy = b.y - a.y + distance = np.linalg.norm([dx,dy]) + steps = int(distance // point_interval) + for step in range(steps+1): # have both 0 and 1 in the lerp for empty points + t = step/(steps+1) + x = int(lerp(a.x, b.x, t)) + y = int(lerp(a.y, b.y, t)) + points.append(LaserPoint(x,y, (0,0,0), 0, True)) + # print('append', steps) + + points.extend(path.points) + + last_point = path.last() + + return points + + + + +class LaserPath(): + def __init__(self, points: list[LaserPoint] = []): + # if len(points) < 1: + # raise RuntimeError("LaserPath should have some points") + + self.points = points + + def last(self): + return self.points[-1] + + def first(self): + return self.points[0] + +class LaserPoint(): + def __init__(self,x,y,c: Color = (255,0,0),i= 255,blank=False): + self.x = x + self.y = y + self.c = c + self._i = i + self.blank = blank + + @property + def color(self): + if self.blank: return (0,0,0) + return self.c + + @property + def i(self): + return 0 if self.blank else self._i + + +#Define point structure +class CHeliosPoint(ctypes.Structure): + #_pack_=1 + _fields_ = [('x', ctypes.c_uint16), + ('y', ctypes.c_uint16), + ('r', ctypes.c_uint8), + ('g', ctypes.c_uint8), + ('b', ctypes.c_uint8), + ('i', ctypes.c_uint8)] + + class LaserRenderer: def __init__(self, config: Namespace, is_running: BaseEvent): self.config = config @@ -68,13 +160,17 @@ class LaserRenderer: self.tracks: Dict[str, Track] = {} self.predictions: Dict[str, Track] = {} + self.helios = ctypes.cdll.LoadLibrary("./trap/helios_dac/libHeliosDacAPI.so") + numDevices = self.helios.OpenDevices() + logger.info(f"Found {numDevices} Helios DACs") - self.dac = HeliosDAC(debug=False) - logger.info(f"{self.dac.dev}") - logger.info(f"{self.dac.GetName()}") - logger.info(f"{self.dac.getHWVersion()}") + + # self.dac = HeliosDAC(debug=False) + # logger.info(f"{self.dac.dev}") + # logger.info(f"{self.dac.GetName()}") + # logger.info(f"{self.dac.getHWVersion()}") - logger.info(f"Helios version: {self.dac.getHWVersion()}") + # logger.info(f"Helios version: {self.dac.getHWVersion()}") # self.init_shapes() @@ -117,7 +213,48 @@ class LaserRenderer: i=0 first_time = None + kpps = 50000 + + # frames = [0 for x in range(30)] + # frameTr= CHeliosPoint(int(x),int(y),20,20,20,255) + # print(frames) + + pointlist_test = [] + # pointlist_test.append(HeliosPoint(10,10, blank=False)) + for i in range(30): + if i < 15: + # y = int(i*0xfff/500) + y = int(i*10 + 0xfff/2) + else: + # y = int((15-i)*0xfff/500) + y = int((15-i)*10 + 0xfff/2) + pointlist_test.append(LaserPoint(int(0),0xfff-y, blank=False)) + # pointlist_test.append(HeliosPoint(10,0xfff, blank=False)) + # pointlist_test.append(HeliosPoint(8000,8000, blank=False)) + # pointlist_test.append(HeliosPoint(8000,10, blank=False)) + # pointlist_test.append(HeliosPoint(10,10, blank=True)) + + # frameType = CHeliosPoint * len(pointlist_test) + # frame = frameType() + + # for j, point in enumerate(pointlist_test): + # frame[j] = CHeliosPoint(point.x, point.y, 0,40,0,0 if point.blank else 255) + + + + + print(f"RENDER DAC\n\n\n") + + last_laser_point = None + # for i in range(150): while self.is_running.is_set(): + # Make 512 attempts for DAC status to be ready. After that, just give up and try to write the frame anyway + # statusAttempts=0 + # while (statusAttempts < 512 and self.helios.GetStatus(0) != 1): + # statusAttempts += 1 + # self.helios.WriteFrame(0, kpps, 0, ctypes.pointer(frame), len(pointlist)) + # continue + i+=1 with timer_counter.get_lock(): timer_counter.value+=1 @@ -138,17 +275,47 @@ class LaserRenderer: except zmq.ZMQError as e: logger.debug(f'reuse tracks') - if tracker_frame is None: - # might need to wait a few iterations before first frame comes available - time.sleep(.1) - continue + # if tracker_frame is None: + # # might need to wait a few iterations before first frame comes available + # time.sleep(.1) + # continue - if first_time is None: + if first_time is None and tracker_frame is not None: first_time = tracker_frame.time - pointlist = render_frame_to_dac(self.dac, tracker_frame, prediction_frame, first_time, self.config, self.tracks, self.predictions, self.config.render_clusters) + + paths = render_frame_to_pointlist( tracker_frame, prediction_frame, first_time, self.config, self.tracks, self.predictions, self.config.render_clusters) + laserframe = LaserFrame(paths) + # pointlist=pointlist_test + # print([(p.x, p.y) for p in pointlist]) + # pointlist.extend(pointlist_test) - self.dac.newFrame(50000, pointlist) + pointlist = laserframe.get_points_interpolated_by_distance(2, last_laser_point) + # print(len(pointlist)) + + if len(pointlist): + last_laser_point = pointlist[-1] + + frameType = CHeliosPoint * len(pointlist) + frame = frameType() + + # print(len(pointlist), last_laser_point.x, last_laser_point.y) + + for j, point in enumerate(pointlist): + frame[j] = CHeliosPoint(int(point.x), int(point.y), point.color[0],point.color[1], point.color[2], point.i) + + # Make 512 attempts for DAC status to be ready. After that, just give up and try to write the frame anyway + statusAttempts=0 + + while (statusAttempts < 512 and self.helios.GetStatus(0) != 1): + statusAttempts += 1 + + self.helios.WriteFrame(0, kpps, 0, ctypes.pointer(frame), len(pointlist)) + + # continue + # self.helios.WriteFrame(0, kpps, 0, ctypes.pointer(frame), len(pointlist)) + + # self.dac.newFrame(50000, pointlist) # clear out old tracks & predictions: @@ -162,6 +329,7 @@ class LaserRenderer: self.predictions.pop(prediction_id) logger.info('Stopping') + self.helios.CloseDevices() # if i>2: @@ -186,22 +354,45 @@ colorset = [ # (0,0,0), # ] -def get_animation_position(track: Track, current_frame: Frame): - fade_duration = current_frame.camera.fps * 3 +def get_animation_position(track: Track, current_frame: Frame) -> float: + fade_duration = current_frame.camera.fps * 2 diff = current_frame.index - track.history[-1].frame_nr return max(0, min(1, diff / fade_duration)) # track.history[-1].frame_nr < (current_frame.index - current_frame.camera.fps * 3) # track.history[-1].frame_nr < (current_frame.index - current_frame.camera.fps * 3) +def circle_points(cx, cy, r, c: Color): + r = 100 + steps = 100 + pointlist: list[LaserPoint] = [] + for i in range(steps): + x = int(cx + math.cos(i * (2*math.pi)/steps) * r) + y = int(cy + math.sin(i * (2*math.pi)/steps)* r) + pointlist.append(LaserPoint(x, y, c, blank=(i==(steps-1)or i==0))) + + return pointlist + +Color = tuple[int, int, int] + +# derived with trap/helios_dac/calibration_points.py +# set points in the script to points from hof3/irl_points.json +laser_H =np.array([[ 2.47442963e+02, -7.01714050e+01, -9.71749119e+01], + [ 1.02328119e+01, 1.47185254e+02, 1.96295638e+02], + [-1.20921986e-03, -3.32735973e-02, 1.00000000e+00]]) + +def world_points_to_laser_points(points): + return cv2.perspectiveTransform(np.array([points]), laser_H) # Deprecated -def render_frame_to_dac(dac: HeliosDAC, tracker_frame: Frame, prediction_frame: Frame, first_time: float, config: Namespace, tracks: Dict[str, Track], predictions: Dict[str, Track], as_clusters = True) -> np.array: +def render_frame_to_pointlist(tracker_frame: Optional[Frame], prediction_frame: Optional[Frame], first_time: Optional[float], config: Namespace, tracks: Dict[str, Track], predictions: Dict[str, Track], as_clusters = True): # TODO: replace opencv with QPainter to support alpha? https://doc.qt.io/qtforpython-5/PySide2/QtGui/QPainter.html#PySide2.QtGui.PySide2.QtGui.QPainter.drawImage # or https://github.com/pygobject/pycairo?tab=readme-ov-file # or https://pyglet.readthedocs.io/en/latest/programming_guide/shapes.html # and use http://code.astraw.com/projects/motmot/pygarrayimage.html or https://gist.github.com/nkymut/1cb40ea6ae4de0cf9ded7332f1ca0d55 # or https://api.arcade.academy/en/stable/index.html (supports gradient color in line -- "Arcade is built on top of Pyglet and OpenGL.") - pointlist = [] + # pointlist: list[LaserPoint] = [] + # frame = LaserFrame() + paths: list[LaserPath] = [] # pointlist.append(HeliosPoint(x,y, dac.palette[cindex],blank=blank)) @@ -215,69 +406,99 @@ def render_frame_to_dac(dac: HeliosDAC, tracker_frame: Frame, prediction_frame: # cv2.imwrite(str(self.config.output_dir / "orig.png"), warpedFrame) # cv2.rectangle(img, (0,0), (img.shape[1],25), (0,0,0), -1) - c = dac.palette[4] # Green - pointlist.append(HeliosPoint(10,10, c,blank=False)) - pointlist.append(HeliosPoint(10,100, c,blank=False)) - pointlist.append(HeliosPoint(100,100, c,blank=False)) - pointlist.append(HeliosPoint(100,10, c,blank=False)) - pointlist.append(HeliosPoint(10,10, c,blank=True)) + intensity = 100 # range 0-255 + test_r = 100 + base_c = (0,0, intensity) + track_c = (intensity,0,0) + pred_c = (0,intensity,0) + if not tracker_frame and not prediction_frame: + paths.append( + LaserPath(circle_points(0xFFF/2, 0xFFF/2, test_r, base_c)) + ) + # c = (0,intensity,0)#, dac.palette[4] # Green + # r = 100 + # steps = 100 + # for i in range(steps): + # x = int(0xFFF/2 + math.cos(i * (2*math.pi)/steps) * r) + # y = int(0xFFF/2 + math.sin(i * (2*math.pi)/steps)* r) + # pointlist.append(HeliosPoint(x, y, c, blank=i==99)) + + # pointlist.append(HeliosPoint(10,10, c,blank=False)) + # pointlist.append(HeliosPoint(10,100, c,blank=False)) + # pointlist.append(HeliosPoint(10,200, c,blank=False)) + # pointlist.append(HeliosPoint(100,200, c,blank=False)) + # pointlist.append(HeliosPoint(200,200, c,blank=False)) + # pointlist.append(HeliosPoint(200,100, c,blank=False)) + # pointlist.append(HeliosPoint(200,10, c,blank=False)) + # pointlist.append(HeliosPoint(100,10, c,blank=False)) + # pointlist.append(HeliosPoint(10,10, c,blank=True)) + + # return pointlist + + # print(not tracker_frame, not prediction_frame) + if not tracker_frame: - c = dac.palette[3] # yellow - pointlist.append(HeliosPoint(110,10, c,blank=False)) - pointlist.append(HeliosPoint(110,100, c,blank=False)) - pointlist.append(HeliosPoint(200,100, c,blank=False)) - pointlist.append(HeliosPoint(200,10, c,blank=False)) - pointlist.append(HeliosPoint(110,10, c,blank=True)) + c = (intensity, intensity, 0) #dac.palette[3] # yellow + paths.append( + LaserPath(circle_points(0xFFF/2+2*test_r, 0xFFF/2, test_r, track_c)) + ) else: for track_id, track in tracks.items(): inv_H = np.linalg.pinv(tracker_frame.H) history = track.get_projected_history(camera=config.camera) - history = convert_world_points_to_img_points(history) + history = world_points_to_laser_points(history)[0] + # point_color = bgr_colors[color_index % len(bgr_colors)] points = np.rint(history.reshape((-1,1,2))).astype(np.int32) + # print('point len',len(points)) + laserpoints = [] for i, point in enumerate(points): - blank = i+1 == len(points) # last point blank - pointlist.append(HeliosPoint(point[0][0], point[0][1], dac.palette[2], blank=blank)) - + laserpoints.append(LaserPoint(point[0][0], point[0][1], track_c, blank=False)) + path = LaserPath(laserpoints) + paths.append(path) # draw_track_projected(img, track, int(track_id), config.camera, convert_world_points_to_img_points) if not prediction_frame: - c = dac.palette[7] # magenta - pointlist.append(HeliosPoint(210,10, c,blank=False)) - pointlist.append(HeliosPoint(210,100, c,blank=False)) - pointlist.append(HeliosPoint(300,100, c,blank=False)) - pointlist.append(HeliosPoint(300,10, c,blank=False)) - pointlist.append(HeliosPoint(210,10, c,blank=True)) + c = (intensity,0,intensity) # dac.palette[7] # magenta + paths.append( + LaserPath(circle_points(0xFFF/2+4*test_r, 0xFFF/2, test_r, pred_c)) + ) # cv2.putText(img, f"Waiting for prediction...", (500,17), cv2.FONT_HERSHEY_PLAIN, 1, (255,255,0), 1) # continue + # elif True: + # pass else: for track_id, track in predictions.items(): inv_H = np.linalg.pinv(prediction_frame.H) # For debugging: # draw_trackjectron_history(img, track, int(track.track_id), convert_world_points_to_img_points) - anim_position = get_animation_position(track, tracker_frame) + anim_position = 1 # TODO)) calculate without video frame: get_animation_position(track, tracker_frame) lines = track_predictions_to_lines(track, config.camera, anim_position) if not lines: continue - lines = [convert_world_points_to_img_points(points) for points in lines] - # cv2 only draws to integer coordinates - lines = [np.rint(points).astype(int) for points in lines] # draw in a single pass # line_points = line_points.reshape((1, -1,1,2)) for line in lines: + # print('prediction line') + line = world_points_to_laser_points(line)[0] + # line = convert_world_points_to_img_points(line) + line = np.rint(line).astype(int) + laserpoints = [] for i, point in enumerate(line): - blank = i+1 == len(points) # last point blank - pointlist.append(HeliosPoint(point[0], point[1], dac.palette[4], blank=blank)) + laserpoints.append(LaserPoint(point[0], point[1], pred_c, blank=False)) + path = LaserPath(laserpoints) + paths.append(path) # draw_track_predictions(img, track, int(track.track_id)+1, config.camera, convert_world_points_to_img_points, anim_position=anim_position, as_clusters=as_clusters) # cv2.putText(img, f"{len(track.predictor_history) if track.predictor_history else 'none'}", to_point(track.history[0].get_foot_coords()), cv2.FONT_HERSHEY_COMPLEX, 1, (255,255,255), 1) - return pointlist + # print(len(paths)) + return paths def run_laser_renderer(config: Namespace, is_running: BaseEvent, timer_counter): diff --git a/trap/video_sources.py b/trap/video_sources.py index 380d981..1c10ca4 100644 --- a/trap/video_sources.py +++ b/trap/video_sources.py @@ -84,7 +84,7 @@ class RtspSource(SingleCvVideoSource): class FilelistSource(SingleCvVideoSource): - def __init__(self, video_sources: Iterable[UrlOrPath], camera: Camera = None, delay = True): + def __init__(self, video_sources: Iterable[UrlOrPath], camera: Camera = None, delay = True, offset = 0): # store current position self.video_sources = video_sources self.camera = camera @@ -94,6 +94,7 @@ class FilelistSource(SingleCvVideoSource): self.frame_idx = None self.n = 0 self.delay_generation = delay + self.offset = offset def recv(self): prev_time = time.time() @@ -110,10 +111,10 @@ class FilelistSource(SingleCvVideoSource): self.frame_count = math.inf self.frame_idx = 0 # TODO)) Video offset - # if self.config.video_offset: - # logger.info(f"Start at frame {self.config.video_offset}") - # video.set(cv2.CAP_PROP_POS_FRAMES, self.config.video_offset) - # self.frame_idx = self.config.video_offset + if self.offset: + logger.info(f"Start at frame {self.offset}") + video.set(cv2.CAP_PROP_POS_FRAMES, self.offset) + self.frame_idx = self.offset while True: ret, img = video.read() @@ -155,16 +156,22 @@ class CameraSource(SingleCvVideoSource): self.video.set(cv2.CAP_PROP_FPS, self.camera.fps) self.frame_idx = 0 -def get_video_source(video_sources: List[UrlOrPath], camera: Camera): +def get_video_source(video_sources: List[UrlOrPath], camera: Camera, frame_offset=0): if str(video_sources[0]).isdigit(): # numeric input is a CV camera + if frame_offset: + logger.info("video-offset ignored for camera source") return CameraSource(int(str(video_sources[0])), camera) elif video_sources[0].url.scheme == 'rtsp': - video_sources[0].url.hostname + # video_sources[0].url.hostname + if frame_offset: + logger.info("video-offset ignored for rtsp source") return RtspSource(video_sources[0]) elif video_sources[0].url.scheme == 'gige': + if frame_offset: + logger.info("video-offset ignored for gige source") return GigE(video_sources[0].url.hostname) else: - return FilelistSource(video_sources) + return FilelistSource(video_sources, offset = frame_offset) # os.environ["OPENCV_FFMPEG_CAPTURE_OPTIONS"] = "fflags;nobuffer|flags;low_delay|avioflags;direct|rtsp_transport;udp"