# used for "Forward Referencing of type annotations" from __future__ import annotations import time import ffmpeg from argparse import Namespace import datetime import logging from multiprocessing import Event from multiprocessing.synchronize import Event as BaseEvent import cv2 import numpy as np import json import pyglet import pyglet.event import zmq import tempfile from pathlib import Path import shutil import math from typing import List, Optional from pyglet import shapes from PIL import Image from trap.utils import convert_world_points_to_img_points from trap.frame_emitter import DetectionState, Frame, Track, Camera logger = logging.getLogger("trap.preview") class FrameAnimation: def __init__(self, frame: Frame): self.start_time = time.time() self.frame = frame @property def t(self): duration = .2 return (time.time() - self.start_time) / duration @property def done(self): return (time.time() - self.start_time) > 5 def exponentialDecay(a, b, decay, dt): """Exponential decay as alternative to Lerp Introduced by Freya Holmér: https://www.youtube.com/watch?v=LSNQuFEDOyQ """ return b + (a-b) * math.exp(-decay * dt) def relativePointToPolar(origin, point) -> tuple[float, float]: x, y = point[0] - origin[0], point[1] - origin[1] return np.sqrt(x**2 + y**2), np.arctan2(y, x) def relativePolarToPoint(origin, r, angle) -> tuple[float, float]: return r * np.cos(angle) + origin[0], r * np.sin(angle) + origin[1] PROJECTION_IMG = 0 PROJECTION_UNDISTORT = 1 PROJECTION_MAP = 2 PROJECTION_PROJECTOR = 4 class DrawnTrack: def __init__(self, track_id, track: Track, renderer: PreviewRenderer, H, draw_projection = PROJECTION_IMG, camera: Optional[Camera] = None): # self.created_at = time.time() self.draw_projection = draw_projection self.update_at = self.created_at = time.time() self.track_id = track_id self.renderer = renderer self.camera = camera self.H = H # TODO)) Move H to Camera object self.drawn_positions = [] self.drawn_predictions = [] self.drawn_pred_history = [] self.shapes: list[pyglet.shapes.Line] = [] self.pred_shapes: list[list[pyglet.shapes.Line]] = [] self.pred_history_shapes: list[pyglet.shapes.Line] = [] self.set_track(track, H) self.set_predictions(track, H) def set_track(self, track: Track, H = None): self.update_at = time.time() self.track = track # self.H = H self.coords = [d.get_foot_coords() for d in track.history] if self.draw_projection == PROJECTION_IMG else track.get_projected_history(None, self.camera) # perhaps only do in constructor: self.inv_H = np.linalg.pinv(self.H) def set_predictions(self, track: Track, H = None): pred_coords = [] pred_history_coords = [] if track.predictions: if self.draw_projection == PROJECTION_IMG: for pred_i, pred in enumerate(track.predictions): pred_coords.append(cv2.perspectiveTransform(np.array([pred]), self.inv_H)[0].tolist()) pred_history_coords = cv2.perspectiveTransform(np.array([track.predictor_history]), self.inv_H)[0].tolist() elif self.draw_projection == PROJECTION_MAP: pred_coords = [pred for pred in track.predictions] pred_history_coords = track.predictor_history self.pred_track = track self.pred_coords = pred_coords self.pred_history_coords = pred_history_coords # color = (128,0,128) if pred_i else (128, def update_drawn_positions(self, dt) -> List: ''' use dt to lerp the drawn positions in the direction of current prediction ''' # TODO: make lerp, currently quick way to get results def int_or_not(v): """quick wrapper to toggle int'ing""" return v # return int(v) # 1. track history for i, pos in enumerate(self.drawn_positions): self.drawn_positions[i][0] = self.coords[i][0] self.drawn_positions[i][1] = self.coords[i][1] # self.drawn_positions[i][0] = int_or_not(exponentialDecay(self.drawn_positions[i][0], self.coords[i][0], 16, dt)) # self.drawn_positions[i][1] = int_or_not(exponentialDecay(self.drawn_positions[i][1], self.coords[i][1], 16, dt)) # print(self.drawn_positions) if len(self.coords) > len(self.drawn_positions): self.drawn_positions.extend(self.coords[len(self.drawn_positions):]) # 2. history as seen by predictor (Trajectron) for i, pos in enumerate(self.drawn_pred_history): if len(self.pred_history_coords) > i: self.drawn_pred_history[i][0] = int_or_not(exponentialDecay(self.drawn_pred_history[i][0], self.pred_history_coords[i][0], 16, dt)) self.drawn_pred_history[i][1] = int_or_not(exponentialDecay(self.drawn_pred_history[i][1], self.pred_history_coords[i][1], 16, dt)) if len(self.pred_history_coords) > len(self.drawn_pred_history): self.drawn_pred_history.extend(self.coords[len(self.drawn_pred_history):]) # 3. predictions if len(self.pred_coords): for a, drawn_prediction in enumerate(self.drawn_predictions): for i, pos in enumerate(drawn_prediction): # TODO: this should be done in polar space starting from origin (i.e. self.drawn_posision[-1]) decay = max(3, (18/i) if i else 10) # points further away move with more delay decay = 16 origin = self.drawn_positions[-1] drawn_r, drawn_angle = relativePointToPolar( origin, drawn_prediction[i]) pred_r, pred_angle = relativePointToPolar(origin, self.pred_coords[a][i]) r = exponentialDecay(drawn_r, pred_r, decay, dt) angle = exponentialDecay(drawn_angle, pred_angle, decay, dt) x, y = relativePolarToPoint(origin, r, angle) self.drawn_predictions[a][i] = int_or_not(x), int_or_not(y) # self.drawn_predictions[i][0] = int(exponentialDecay(self.drawn_predictions[i][0], self.pred_coords[i][0], decay, dt)) # self.drawn_predictions[i][1] = int(exponentialDecay(self.drawn_predictions[i][1], self.pred_coords[i][1], decay, dt)) if len(self.pred_coords) > len(self.drawn_predictions): self.drawn_predictions.extend(self.pred_coords[len(self.drawn_predictions):]) # for a, drawn_prediction in self.drawn_predictions: # if len(self.pred_coords) > len(self.drawn_predictions): # self.drawn_predictions.extend(self.pred_coords[len(self.drawn_predictions):]) # self.drawn_positions = self.coords # finally: update shapes from coordinates self.update_shapes(dt) return self.drawn_positions def update_shapes(self, dt): drawn_positions = convert_world_points_to_img_points(self.coords[:500]) # TODO)) Glitch in self.drawn_positions, now also capped drawn_pred_history = convert_world_points_to_img_points(self.drawn_pred_history) drawn_predictions = [convert_world_points_to_img_points(p) for p in self.drawn_predictions] # positions = convert_world_points_to_img_points(self.drawn_predictions) # print("drawn", # drawn_positions,'self', self.drawn_positions # ) if len(self.shapes) > len(drawn_positions): self.shapes = self.shapes[:len(drawn_positions)] # for i, pos in drawn_positions.enumerate(): draw_dot = False # if False, draw line for_laser = True if True: for ci in range(1, len(drawn_positions)): x, y = [int(p) for p in drawn_positions[ci-1]] x2, y2 = [int(p) for p in drawn_positions[ci]] y, y2 = self.renderer.window.height - y, self.renderer.window.height - y2 color = [100+155*ci // len(drawn_positions)]*3 # print(x,y,x2,y2,color) if ci >= len(self.shapes): # TODO: add color2 if draw_dot: line = pyglet.shapes.Arc(x2, y2, 10, thickness=2, color=color, batch=self.renderer.batch_anim) else: line = self.renderer.gradientLine(x, y, x2, y2, 3, color, color, batch=self.renderer.batch_anim) line.opacity = 20 if not for_laser else 255 self.shapes.append(line) else: line = self.shapes[ci-1] line.x, line.y = x, y if draw_dot: line.radius = int(exponentialDecay(line.radius, 1.5, 3, dt)) else: line.x2, line.y2 = x2, y2 line.color = color if not for_laser: line.opacity = int(exponentialDecay(line.opacity, 180, 8, dt)) # TODO: basically a duplication of the above, do this smarter? # TODO: add intermediate segment color = colorset[self.track_id % len(colorset)] if False: if len(self.pred_history_shapes) > len(drawn_pred_history): self.pred_history_shapes = self.pred_history_shapes[:len(drawn_pred_history)] # for i, pos in drawn_pred_history.enumerate(): for ci in range(1, len(drawn_pred_history)): x, y = [int(p) for p in drawn_pred_history[ci-1]] x2, y2 = [int(p) for p in drawn_pred_history[ci]] y, y2 = self.renderer.window.height - y, self.renderer.window.height - y2 if ci >= len(self.pred_history_shapes): # line = self.renderer.gradientLine(x, y, x2, y2, 3, color, color, batch=self.renderer.batch_anim) line = pyglet.shapes.Line(x,y ,x2, y2, 2.5, color, batch=self.renderer.batch_anim) # line = pyglet.shapes.Arc(x2, y2, 10, thickness=2, color=color, batch=self.renderer.batch_anim) line.opacity = 120 self.pred_history_shapes.append(line) else: line = self.pred_history_shapes[ci-1] line.x, line.y = x, y line.x2, line.y2 = x2, y2 # line.radius = int(exponentialDecay(line.radius, 1.5, 3, dt)) line.color = color line.opacity = int(exponentialDecay(line.opacity, 180, 8, dt)) if True: for a, drawn_prediction in enumerate(drawn_predictions): if len(self.pred_shapes) <= a: self.pred_shapes.append([]) if len(self.pred_shapes[a]) > (len(drawn_prediction) +1): self.pred_shapes[a] = self.pred_shapes[a][:len(drawn_prediction)] # for i, pos in drawn_predictions.enumerate(): for ci in range(0, len(drawn_prediction)): if ci == 0: continue # x, y = [int(p) for p in drawn_positions[-1]] else: x, y = [int(p) for p in drawn_prediction[ci-1]] x2, y2 = [int(p) for p in drawn_prediction[ci]] y, y2 = self.renderer.window.height - y, self.renderer.window.height - y2 # color = [255,0,0] # print(x,y,x2,y2,color) if ci >= len(self.pred_shapes[a]): # TODO: add color2 # line = self.renderer.gradientLine(x, y, x2, y2, 3, color, color, batch=self.renderer.batch_anim) line = pyglet.shapes.Line(x,y ,x2, y2, 1.5, color, batch=self.renderer.batch_anim) # line = pyglet.shapes.Arc(x,y ,1.5, thickness=1.5, color=color, batch=self.renderer.batch_anim) line.opacity = 5 self.pred_shapes[a].append(line) else: line = self.pred_shapes[a][ci-1] line.x, line.y = x, y line.x2, line.y2 = x2, y2 line.color = color decay = (16/ci) if ci else 16 half = len(drawn_prediction) / 2 if ci < half: target_opacity = 60 else: target_opacity = (1 - ((ci - half) / half)) * 60 line.opacity = int(exponentialDecay(line.opacity, target_opacity, decay, dt)) class FrameWriter: """ Drop-in compatible interface with cv2.VideoWriter, but support variable framerate. See https://video.stackexchange.com/questions/25811/ffmpeg-make-video-with-non-constant-framerate-from-image-filenames """ def __init__(self, filename: str, fps: float, frame_size: tuple) -> None: self.filename = filename self.fps = fps self.frame_size = frame_size self.tmp_dir = tempfile.TemporaryDirectory(prefix="trap-output-") self.i = 0 def write(self, img: cv2.typing.MatLike): self.i += 1 cv2.imwrite(self.tmp_dir.name + f"/{self.i:07d}.png", img) def release(self): """Actually write the video""" # ffmpeg -f image2 -ts_from_file 2 -i %d.png out.mp4 logger.info(f"Write frames from {self.tmp_dir.name} to {self.filename}") encode = ( ffmpeg # the magic here is in --ts_from_file which uses the mtime of the file for the interval # this makes it possible to have non-constant intervals between frames, which is usefull # since we render frames when we get them .input(self.tmp_dir.name + "/%07d.png", format="image2", ts_from_file=2) .output(self.filename, vsync="vfr" ) # framerate=self.fps) ) logger.info(encode.compile()) encode.run() logger.info(f"Rm frame directory: {self.tmp_dir.name}") # logger.warning(f"RM DISABLED!") self.tmp_dir.cleanup() class PreviewRenderer: def __init__(self, config: Namespace, is_running: BaseEvent): self.config = config self.is_running = is_running context = zmq.Context() self.prediction_sock = context.socket(zmq.SUB) self.prediction_sock.setsockopt(zmq.CONFLATE, 1) # only keep latest frame. NB. make sure this comes BEFORE connect, otherwise it's ignored!! self.prediction_sock.setsockopt(zmq.SUBSCRIBE, b'') # self.prediction_sock.connect(config.zmq_prediction_addr if not self.config.bypass_prediction else config.zmq_trajectory_addr) self.prediction_sock.connect(config.zmq_prediction_addr) self.tracker_sock = context.socket(zmq.SUB) self.tracker_sock.setsockopt(zmq.CONFLATE, 1) # only keep latest frame. NB. make sure this comes BEFORE connect, otherwise it's ignored!! self.tracker_sock.setsockopt(zmq.SUBSCRIBE, b'') self.tracker_sock.connect(config.zmq_trajectory_addr) self.frame_sock = context.socket(zmq.SUB) self.frame_sock.setsockopt(zmq.CONFLATE, 1) # only keep latest frame. NB. make sure this comes BEFORE connect, otherwise it's ignored!! self.frame_sock.setsockopt(zmq.SUBSCRIBE, b'') self.frame_sock.connect(config.zmq_frame_addr) # TODO)) Move loading H to config.py # if self.config.homography.suffix == '.json': # with self.config.homography.open('r') as fp: # self.H = np.array(json.load(fp)) # else: # self.H = np.loadtxt(self.config.homography, delimiter=',') # print('h', self.config.H) self.H = self.config.H self.inv_H = np.linalg.pinv(self.H) # TODO: get FPS from frame_emitter # self.out = cv2.VideoWriter(str(filename), fourcc, 23.97, (1280,720)) self.fps = 60 self.frame_size = (self.config.camera.w,self.config.camera.h) self.hide_stats = False self.out_writer = self.start_writer() if self.config.render_file else None self.streaming_process = self.start_streaming() if self.config.render_url else None if self.config.render_window: pass # cv2.namedWindow("frame", cv2.WND_PROP_FULLSCREEN) # cv2.setWindowProperty("frame",cv2.WND_PROP_FULLSCREEN,cv2.WINDOW_FULLSCREEN) else: pyglet.options["headless"] = True config = pyglet.gl.Config(sample_buffers=1, samples=4) # , fullscreen=self.config.render_window self.window = pyglet.window.Window(width=self.frame_size[0], height=self.frame_size[1], config=config, fullscreen=self.config.full_screen) self.window.set_handler('on_draw', self.on_draw) self.window.set_handler('on_refresh', self.on_refresh) self.window.set_handler('on_close', self.on_close) pyglet.gl.glClearColor(81./255, 20/255, 46./255, 0) self.fps_display = pyglet.window.FPSDisplay(window=self.window, color=(255,255,255,255)) self.fps_display.label.x = self.window.width - 50 self.fps_display.label.y = self.window.height - 17 self.fps_display.label.bold = False self.fps_display.label.font_size = 10 self.drawn_tracks: dict[str, DrawnTrack] = {} self.first_time: float|None = None self.frame: Frame|None= None self.tracker_frame: Frame|None = None self.prediction_frame: Frame|None = None self.batch_bg = pyglet.graphics.Batch() self.batch_overlay = pyglet.graphics.Batch() self.batch_anim = pyglet.graphics.Batch() self.init_shapes() self.init_labels() def init_shapes(self): ''' Due to error when running headless, we need to configure options before extending the shapes class ''' class GradientLine(shapes.Line): def __init__(self, x, y, x2, y2, width=1, color1=[255,255,255], color2=[255,255,255], batch=None, group=None): # print('colors!', colors) # assert len(colors) == 6 r, g, b, *a = color1 self._rgba1 = (r, g, b, a[0] if a else 255) r, g, b, *a = color2 self._rgba2 = (r, g, b, a[0] if a else 255) # print('rgba', self._rgba) super().__init__(x, y, x2, y2, width, color1, batch=None, group=None) # 10.2f}s" self.labels['frame_latency'].text = f"{self.frame.time - time.time():.2f}s" if self.tracker_frame: self.labels['tracker_idx'].text = f"{self.tracker_frame.index - self.frame.index}" self.labels['tracker_time'].text = f"{self.tracker_frame.time - time.time():.3f}s" self.labels['track_len'].text = f"{len(self.tracker_frame.tracks)} tracks" if self.prediction_frame: self.labels['pred_idx'].text = f"{self.prediction_frame.index - self.frame.index}" self.labels['pred_time'].text = f"{self.prediction_frame.time - time.time():.3f}s" # self.labels['track_len'].text = f"{len(self.prediction_frame.tracks)} tracks" # cv2.putText(img, f"{frame.index:06d}", (20,17), cv2.FONT_HERSHEY_PLAIN, 1, base_color, 1) # cv2.putText(img, f"{frame.time - first_time:.3f}s", (120,17), cv2.FONT_HERSHEY_PLAIN, 1, base_color, 1) # if prediction_frame: # # render Δt and Δ frames # cv2.putText(img, f"{prediction_frame.index - frame.index}", (90,17), cv2.FONT_HERSHEY_PLAIN, 1, info_color, 1) # cv2.putText(img, f"{prediction_frame.time - time.time():.2f}s", (200,17), cv2.FONT_HERSHEY_PLAIN, 1, info_color, 1) # cv2.putText(img, f"{len(prediction_frame.tracks)} tracks", (500,17), cv2.FONT_HERSHEY_PLAIN, 1, base_color, 1) # cv2.putText(img, f"h: {np.average([len(t.history or []) for t in prediction_frame.tracks.values()]):.2f}", (580,17), cv2.FONT_HERSHEY_PLAIN, 1, info_color, 1) # cv2.putText(img, f"ph: {np.average([len(t.predictor_history or []) for t in prediction_frame.tracks.values()]):.2f}", (660,17), cv2.FONT_HERSHEY_PLAIN, 1, info_color, 1) # cv2.putText(img, f"p: {np.average([len(t.predictions or []) for t in prediction_frame.tracks.values()]):.2f}", (740,17), cv2.FONT_HERSHEY_PLAIN, 1, info_color, 1) # options = [] # for option in ['prediction_horizon','num_samples','full_dist','gmm_mode','z_mode', 'model_dir']: # options.append(f"{option}: {config.__dict__[option]}") # cv2.putText(img, options.pop(-1), (20,img.shape[0]-30), cv2.FONT_HERSHEY_PLAIN, 1, base_color, 1) # cv2.putText(img, " | ".join(options), (20,img.shape[0]-10), cv2.FONT_HERSHEY_PLAIN, 1, base_color, 1) def check_frames(self, dt): new_tracks = False try: self.frame: Frame = self.frame_sock.recv_pyobj(zmq.NOBLOCK) if not self.first_time: self.first_time = self.frame.time img = cv2.GaussianBlur(self.frame.img, (15, 15), 0) img = cv2.flip(cv2.cvtColor(img, cv2.COLOR_BGR2RGB), 0) img = pyglet.image.ImageData(self.frame_size[0], self.frame_size[1], 'RGB', img.tobytes()) # don't draw in batch, so that it is the background self.video_sprite = pyglet.sprite.Sprite(img=img, batch=self.batch_bg) self.video_sprite.opacity = 100 except zmq.ZMQError as e: # idx = frame.index if frame else "NONE" # logger.debug(f"reuse video frame {idx}") pass try: self.prediction_frame: Frame = self.prediction_sock.recv_pyobj(zmq.NOBLOCK) new_tracks = True except zmq.ZMQError as e: pass try: self.tracker_frame: Frame = self.tracker_sock.recv_pyobj(zmq.NOBLOCK) new_tracks = True except zmq.ZMQError as e: pass if new_tracks: self.update_tracks() def update_tracks(self): """Updates the track objects and shapes. Called after setting `prediction_frame` """ # clean up # for track_id in list(self.drawn_tracks.keys()): # if track_id not in self.prediction_frame.tracks.keys(): # # TODO fade out # del self.drawn_tracks[track_id] if self.prediction_frame: for track_id, track in self.prediction_frame.tracks.items(): if track_id not in self.drawn_tracks: self.drawn_tracks[track_id] = DrawnTrack(track_id, track, self, self.prediction_frame.H) else: self.drawn_tracks[track_id].set_track(track, self.prediction_frame.H) # clean up for track_id in list(self.drawn_tracks.keys()): # TODO make delay configurable if self.drawn_tracks[track_id].update_at < time.time() - 5: # TODO fade out del self.drawn_tracks[track_id] def on_key_press(self, symbol, modifiers): print('A key was pressed, use f to hide') if symbol == ord('f'): self.window.set_fullscreen(not self.window.fullscreen) if symbol == ord('h'): self.hide_stats = not self.hide_stats def check_running(self, dt): if not self.is_running.is_set(): self.window.close() self.event_loop.exit() def on_close(self): self.is_running.clear() def on_refresh(self, dt: float): # update shapes # self.bg = for track_id, track in self.drawn_tracks.items(): track.update_drawn_positions(dt) self.refresh_labels(dt) # self.shape1 = shapes.Circle(700, 150, 100, color=(50, 0, 30), batch=self.batch_anim) # self.shape3 = shapes.Circle(800, 150, 100, color=(100, 225, 30), batch=self.batch_anim) pass def on_draw(self): self.window.clear() self.batch_bg.draw() for track in self.drawn_tracks.values(): for shape in track.shapes: shape.draw() # for some reason the batches don't work for track in self.drawn_tracks.values(): for shapes in track.pred_shapes: for shape in shapes: shape.draw() # self.batch_anim.draw() self.batch_overlay.draw() # pyglet.graphics.draw(3, pyglet.gl.GL_LINE, ("v2i", (100,200, 600,800)), ('c3B', (255,255,255, 255,255,255))) if not self.hide_stats: self.fps_display.draw() # if streaming, capture buffer and send try: if self.streaming_process or self.out_writer: buf = pyglet.image.get_buffer_manager().get_color_buffer() img_data = buf.get_image_data() data = img_data.get_data() # alternative: .get_data("RGBA", image_data.pitch) img = np.asanyarray(data).reshape((img_data.height, img_data.width, 4)) img = cv2.cvtColor(img, cv2.COLOR_BGRA2RGB) img = np.flip(img, 0) # img = cv2.flip(img, cv2.0) # cv2.imshow('frame', img) # cv2.waitKey(1) if self.streaming_process: self.streaming_process.stdin.write(img.tobytes()) if self.out_writer: self.out_writer.write(img) except Exception as e: logger.exception(e) def start_writer(self): if not self.config.output_dir.exists(): raise FileNotFoundError("Path does not exist") date_str = datetime.datetime.now().isoformat(timespec="minutes") filename = self.config.output_dir / f"render_predictions-{date_str}-{self.config.detector}.mp4" logger.info(f"Write to {filename}") return FrameWriter(str(filename), self.fps, self.frame_size) fourcc = cv2.VideoWriter_fourcc(*'vp09') return cv2.VideoWriter(str(filename), fourcc, self.fps, self.frame_size) def start_streaming(self): return ( ffmpeg .input('pipe:', format='rawvideo',codec="rawvideo", pix_fmt='bgr24', s='{}x{}'.format(*self.frame_size)) .output( self.config.render_url, #codec = "copy", # use same codecs of the original video codec='libx264', listen=1, # enables HTTP server pix_fmt="yuv420p", preset="ultrafast", tune="zerolatency", # g=f"{self.fps*2}", g=f"{60*2}", analyzeduration="2000000", probesize="1000000", f='mpegts' ) .overwrite_output() .run_async(pipe_stdin=True) ) # return process def run(self): frame = None prediction_frame = None tracker_frame = None i=0 first_time = None self.event_loop = pyglet.app.EventLoop() pyglet.clock.schedule_interval(self.check_running, 0.1) pyglet.clock.schedule(self.check_frames) self.event_loop.run() # while self.is_running.is_set(): # i+=1 # # zmq_ev = self.frame_sock.poll(timeout=2000) # # if not zmq_ev: # # # when no data comes in, loop so that is_running is checked # # continue # try: # frame: Frame = self.frame_sock.recv_pyobj(zmq.NOBLOCK) # except zmq.ZMQError as e: # # idx = frame.index if frame else "NONE" # # logger.debug(f"reuse video frame {idx}") # pass # # else: # # logger.debug(f'new video frame {frame.index}') # if frame is None: # # might need to wait a few iterations before first frame comes available # time.sleep(.1) # continue # try: # prediction_frame: Frame = self.prediction_sock.recv_pyobj(zmq.NOBLOCK) # except zmq.ZMQError as e: # logger.debug(f'reuse prediction') # if first_time is None: # first_time = frame.time # img = decorate_frame(frame, prediction_frame, first_time, self.config) # img_path = (self.config.output_dir / f"{i:05d}.png").resolve() # logger.debug(f"write frame {frame.time - first_time:.3f}s") # if self.out_writer: # self.out_writer.write(img) # if self.streaming_process: # self.streaming_process.stdin.write(img.tobytes()) # if self.config.render_window: # cv2.imshow('frame',img) # cv2.waitKey(1) logger.info('Stopping') # if i>2: if self.streaming_process: self.streaming_process.stdin.close() if self.out_writer: self.out_writer.release() if self.streaming_process: # oddly wrapped, because both close and release() take time. logger.info('wait for closing stream') self.streaming_process.wait() logger.info('stopped') # colorset = itertools.product([0,255], repeat=3) # but remove white # colorset = [(0, 0, 0), # (0, 0, 255), # (0, 255, 0), # (0, 255, 255), # (255, 0, 0), # (255, 0, 255), # (255, 255, 0) # ] colorset = [ (255,255,100), (255,100,255), (100,255,255), ] # colorset = [ # (0,0,0), # ] # Deprecated def decorate_frame(frame: Frame, prediction_frame: Frame, first_time: float, config: Namespace) -> np.array: # TODO: replace opencv with QPainter to support alpha? https://doc.qt.io/qtforpython-5/PySide2/QtGui/QPainter.html#PySide2.QtGui.PySide2.QtGui.QPainter.drawImage # or https://github.com/pygobject/pycairo?tab=readme-ov-file # or https://pyglet.readthedocs.io/en/latest/programming_guide/shapes.html # and use http://code.astraw.com/projects/motmot/pygarrayimage.html or https://gist.github.com/nkymut/1cb40ea6ae4de0cf9ded7332f1ca0d55 # or https://api.arcade.academy/en/stable/index.html (supports gradient color in line -- "Arcade is built on top of Pyglet and OpenGL.") frame.img overlay = np.zeros(frame.img.shape, np.uint8) # Fill image with red color(set each pixel to red) overlay[:] = (130, 0, 75) img = cv2.addWeighted(frame.img, .4, overlay, .6, 0) # img = frame.img.copy() # all not working: # if i == 1: # # thanks to GpG for fixing scaling issue: https://stackoverflow.com/a/39668864 # scale_factor = 1./20 # from 10m to 1000px # S = np.array([[scale_factor, 0,0],[0,scale_factor,0 ],[ 0,0,1 ]]) # new_H = S * self.H * np.linalg.inv(S) # warpedFrame = cv2.warpPerspective(img, new_H, (1000,1000)) # cv2.imwrite(str(self.config.output_dir / "orig.png"), warpedFrame) cv2.rectangle(img, (0,0), (img.shape[1],25), (0,0,0), -1) if not prediction_frame: cv2.putText(img, f"Waiting for prediction...", (500,17), cv2.FONT_HERSHEY_PLAIN, 1, (255,255,0), 1) # continue else: inv_H = np.linalg.pinv(prediction_frame.H) for track_id, track in prediction_frame.tracks.items(): if not len(track.history): continue # coords = cv2.perspectiveTransform(np.array([prediction['history']]), self.inv_H)[0] coords = [d.get_foot_coords() for d in track.history] confirmations = [d.state == DetectionState.Confirmed for d in track.history] # logger.warning(f"{coords=}") for ci in range(1, len(coords)): start = [int(p) for p in coords[ci-1]] end = [int(p) for p in coords[ci]] # color = (255,255,255) if confirmations[ci] else (100,100,100) color = [100+155*ci/len(coords)]*3 cv2.line(img, start, end, color, 1, lineType=cv2.LINE_AA) cv2.circle(img, end, 2, color, lineType=cv2.LINE_AA) if not track.predictions or not len(track.predictions): continue color = colorset[track_id % len(colorset)] for pred_i, pred in enumerate(track.predictions): pred_coords = cv2.perspectiveTransform(np.array([pred]), inv_H)[0].tolist() # color = (128,0,128) if pred_i else (128,128,0) for ci in range(0, len(pred_coords)): if ci == 0: start = [int(p) for p in coords[-1]] # start = [0,0]? # print(start) else: start = [int(p) for p in pred_coords[ci-1]] end = [int(p) for p in pred_coords[ci]] cv2.line(img, start, end, color, 2, lineType=cv2.LINE_AA) cv2.circle(img, end, 2, color, 1, lineType=cv2.LINE_AA) for track_id, track in prediction_frame.tracks.items(): # draw tracker marker and track id last so it lies over the trajectories # this goes is a second loop so it overlays over _all_ trajectories # coords = cv2.perspectiveTransform(np.array([[track.history[-1].get_foot_coords()]]), self.inv_H)[0] coords = track.history[-1].get_foot_coords() color = colorset[track_id % len(colorset)] center = [int(p) for p in coords] cv2.circle(img, center, 6, (255,255,255), thickness=3) (l, t, r, b) = track.history[-1].to_ltrb() p1 = (l, t) p2 = (r, b) # cv2.rectangle(img, p1, p2, (255,0,0), 1) cv2.putText(img, f"{track_id} ({(track.history[-1].conf or 0):.2f})", (center[0]+8, center[1]), cv2.FONT_HERSHEY_SIMPLEX, fontScale=.7, thickness=1, color=color, lineType=cv2.LINE_AA) base_color = (255,)*3 info_color = (255,255,0) cv2.putText(img, f"{frame.index:06d}", (20,17), cv2.FONT_HERSHEY_PLAIN, 1, base_color, 1) cv2.putText(img, f"{frame.time - first_time:.3f}s", (120,17), cv2.FONT_HERSHEY_PLAIN, 1, base_color, 1) if prediction_frame: # render Δt and Δ frames cv2.putText(img, f"{prediction_frame.index - frame.index}", (90,17), cv2.FONT_HERSHEY_PLAIN, 1, info_color, 1) cv2.putText(img, f"{prediction_frame.time - time.time():.2f}s", (200,17), cv2.FONT_HERSHEY_PLAIN, 1, info_color, 1) cv2.putText(img, f"{len(prediction_frame.tracks)} tracks", (500,17), cv2.FONT_HERSHEY_PLAIN, 1, base_color, 1) cv2.putText(img, f"h: {np.average([len(t.history or []) for t in prediction_frame.tracks.values()]):.2f}", (580,17), cv2.FONT_HERSHEY_PLAIN, 1, info_color, 1) cv2.putText(img, f"ph: {np.average([len(t.predictor_history or []) for t in prediction_frame.tracks.values()]):.2f}", (660,17), cv2.FONT_HERSHEY_PLAIN, 1, info_color, 1) cv2.putText(img, f"p: {np.average([len(t.predictions or []) for t in prediction_frame.tracks.values()]):.2f}", (740,17), cv2.FONT_HERSHEY_PLAIN, 1, info_color, 1) options = [] for option in ['prediction_horizon','num_samples','full_dist','gmm_mode','z_mode', 'model_dir']: options.append(f"{option}: {config.__dict__[option]}") cv2.putText(img, options.pop(-1), (20,img.shape[0]-30), cv2.FONT_HERSHEY_PLAIN, 1, base_color, 1) cv2.putText(img, " | ".join(options), (20,img.shape[0]-10), cv2.FONT_HERSHEY_PLAIN, 1, base_color, 1) return img def run_preview_renderer(config: Namespace, is_running: BaseEvent): renderer = PreviewRenderer(config, is_running) renderer.run()