WIP refactor to keep predictions longer

This commit is contained in:
Ruben van de Ven 2024-10-02 12:05:53 +02:00
parent c3263e7448
commit e837617e39

View file

@ -18,6 +18,7 @@ import tempfile
from pathlib import Path from pathlib import Path
import shutil import shutil
import math import math
from collections import deque
from pyglet import shapes from pyglet import shapes
from PIL import Image from PIL import Image
@ -62,11 +63,13 @@ class DrawnTrack:
self.update_at = self.created_at = time.time() self.update_at = self.created_at = time.time()
self.track_id = track_id self.track_id = track_id
self.renderer = renderer self.renderer = renderer
self.set_track(track, H)
self.drawn_positions = [] self.drawn_positions = []
self.drawn_predictions = [] self.drawn_predictions = []
self.predictions: deque[DrawnPrediction] = deque(maxlen=20) # TODO; make configurable
self.shapes: list[pyglet.shapes.Line] = [] self.shapes: list[pyglet.shapes.Line] = []
self.pred_shapes: list[list[pyglet.shapes.Line]] = []
self.set_track(track, H)
self.set_prediction(track)
def set_track(self, track: Track, H): def set_track(self, track: Track, H):
self.update_at = time.time() self.update_at = time.time()
@ -78,11 +81,17 @@ class DrawnTrack:
# perhaps only do in constructor: # perhaps only do in constructor:
self.inv_H = np.linalg.pinv(self.H) self.inv_H = np.linalg.pinv(self.H)
def set_prediction(self, track: Track):
# TODO: turn into add_prediction
pred_coords = [] pred_coords = []
if not track.predictions:
return
for pred_i, pred in enumerate(track.predictions): for pred_i, pred in enumerate(track.predictions):
pred_coords.append(cv2.perspectiveTransform(np.array([pred]), self.inv_H)[0].tolist()) pred_coords.append(cv2.perspectiveTransform(np.array([pred]), self.inv_H)[0].tolist())
self.pred_coords = pred_coords # self.pred_coords = pred_coords
self.predictions.append(DrawnPrediction(self, pred_coords))
# color = (128,0,128) if pred_i else (128, # color = (128,0,128) if pred_i else (128,
@ -98,23 +107,26 @@ class DrawnTrack:
if len(self.coords) > len(self.drawn_positions): if len(self.coords) > len(self.drawn_positions):
self.drawn_positions.extend(self.coords[len(self.drawn_positions):]) self.drawn_positions.extend(self.coords[len(self.drawn_positions):])
for a, drawn_prediction in enumerate(self.drawn_predictions): # Superseded by individual drawnprediction elements
for i, pos in enumerate(drawn_prediction): # for a, drawn_prediction in enumerate(self.drawn_predictions):
# TODO: this should be done in polar space starting from origin (i.e. self.drawn_posision[-1]) # for i, pos in enumerate(drawn_prediction):
decay = max(3, (18/i) if i else 10) # points further away move with more delay # # TODO: this should be done in polar space starting from origin (i.e. self.drawn_posision[-1])
decay = 6 # decay = max(3, (18/i) if i else 10) # points further away move with more delay
origin = self.drawn_positions[-1] # decay = 6
drawn_r, drawn_angle = relativePointToPolar( origin, drawn_prediction[i]) # origin = self.drawn_positions[-1]
pred_r, pred_angle = relativePointToPolar(origin, self.pred_coords[a][i]) # drawn_r, drawn_angle = relativePointToPolar( origin, drawn_prediction[i])
r = exponentialDecay(drawn_r, pred_r, decay, dt) # pred_r, pred_angle = relativePointToPolar(origin, self.pred_coords[a][i])
angle = exponentialDecay(drawn_angle, pred_angle, decay, dt) # r = exponentialDecay(drawn_r, pred_r, decay, dt)
x, y = relativePolarToPoint(origin, r, angle) # angle = exponentialDecay(drawn_angle, pred_angle, decay, dt)
self.drawn_predictions[a][i] = int(x), int(y) # x, y = relativePolarToPoint(origin, r, angle)
# self.drawn_predictions[i][0] = int(exponentialDecay(self.drawn_predictions[i][0], self.pred_coords[i][0], decay, dt)) # self.drawn_predictions[a][i] = int(x), int(y)
# self.drawn_predictions[i][1] = int(exponentialDecay(self.drawn_predictions[i][1], self.pred_coords[i][1], decay, dt)) # # self.drawn_predictions[i][0] = int(exponentialDecay(self.drawn_predictions[i][0], self.pred_coords[i][0], decay, dt))
# # self.drawn_predictions[i][1] = int(exponentialDecay(self.drawn_predictions[i][1], self.pred_coords[i][1], decay, dt))
# if len(self.pred_coords) > len(self.drawn_predictions):
# self.drawn_predictions.extend(self.pred_coords[len(self.drawn_predictions):])
if len(self.pred_coords) > len(self.drawn_predictions):
self.drawn_predictions.extend(self.pred_coords[len(self.drawn_predictions):])
# for a, drawn_prediction in self.drawn_predictions: # for a, drawn_prediction in self.drawn_predictions:
# if len(self.pred_coords) > len(self.drawn_predictions): # if len(self.pred_coords) > len(self.drawn_predictions):
# self.drawn_predictions.extend(self.pred_coords[len(self.drawn_predictions):]) # self.drawn_predictions.extend(self.pred_coords[len(self.drawn_predictions):])
@ -192,6 +204,55 @@ class DrawnTrack:
target_opacity = (1 - ((ci - half) / half)) * 180 target_opacity = (1 - ((ci - half) / half)) * 180
line.opacity = int(exponentialDecay(line.opacity, target_opacity, decay, dt)) line.opacity = int(exponentialDecay(line.opacity, target_opacity, decay, dt))
class DrawnPrediction:
def __init__(self, drawn_track: DrawnTrack, coords: list[list] = []):
self.created_at = time.time()
# self.renderer = renderer
self.drawn_track = drawn_track
self.coords = coords
self.color = colorset[self.drawn_track.track_id % len(colorset)]
self.pred_shapes: list[list[pyglet.shapes.Line]] = []
# coords is a list of predictions
for a, coords in enumerate(self.coords):
prediction_shapes = []
for ci in range(0, len(coords)):
if ci == 0:
x, y = [int(p) for p in self.drawn_track.coords[-1]]
else:
x, y = [int(p) for p in coords[ci-1]]
x2, y2 = [int(p) for p in coords[ci]]
# flip in window:
y, y2 = self.drawn_track.renderer.window.height - y, self.drawn_track.renderer.window.height - y2
line = self.drawn_track.renderer.gradientLine(x, y, x2, y2, 3, self.color, self.color, batch=self.drawn_track.renderer.batch_anim)
line.opacity = 5
prediction_shapes.append(line)
self.pred_shapes.append(prediction_shapes)
def update_opacities(self, dt: float):
"""
Update the opacties of the drawn line, by only using the dt provided by the renderer
Done using exponential decal, with a different decay value per item
"""
for a, coords in enumerate(self.coords):
for ci in range(0, len(coords)):
line = self.pred_shapes[a][ci-1]
# Positions of prediction no longer update
# line.x, line.y = x, y
# line.x2, line.y2 = x2, y2
# line.color = color
decay = (16/ci) if ci else 16
half = len(coords) / 2
if ci < half:
target_opacity = 180
else:
target_opacity = (1 - ((ci - half) / half)) * 180
line.opacity = int(exponentialDecay(line.opacity, target_opacity, decay, dt))
class FrameWriter: class FrameWriter:
""" """
@ -415,14 +476,14 @@ class Renderer:
def check_frames(self, dt): def check_frames(self, dt):
new_tracks = False
try: try:
self.frame: Frame = self.frame_sock.recv_pyobj(zmq.NOBLOCK) self.frame: Frame = self.frame_sock.recv_pyobj(zmq.NOBLOCK)
if not self.first_time: if not self.first_time:
self.first_time = self.frame.time self.first_time = self.frame.time
img = cv2.GaussianBlur(self.frame.img, (15, 15), 0) img = cv2.GaussianBlur(self.frame.img, (15, 15), 0)
img = cv2.flip(cv2.cvtColor(img, cv2.COLOR_BGR2RGB), 0) img = cv2.cvtColor(cv2.cvtColor(img, cv2.COLOR_BGR2GRAY), cv2.COLOR_GRAY2RGB)
img = pyglet.image.ImageData(self.frame_size[0], self.frame_size[1], 'RGB', img.tobytes()) channels = 3 # unfortunately, pyglet seems to draw single channel as Red only
img = pyglet.image.ImageData(self.frame_size[0], self.frame_size[1], 'RGB', img.tobytes(), pitch=self.frame_size[0] * -1 * channels)
# don't draw in batch, so that it is the background # don't draw in batch, so that it is the background
self.video_sprite = pyglet.sprite.Sprite(img=img, batch=self.batch_bg) self.video_sprite = pyglet.sprite.Sprite(img=img, batch=self.batch_bg)
self.video_sprite.opacity = 100 self.video_sprite.opacity = 100
@ -432,17 +493,15 @@ class Renderer:
pass pass
try: try:
self.prediction_frame: Frame = self.prediction_sock.recv_pyobj(zmq.NOBLOCK) self.prediction_frame: Frame = self.prediction_sock.recv_pyobj(zmq.NOBLOCK)
new_tracks = True self.update_predictions()
except zmq.ZMQError as e: except zmq.ZMQError as e:
pass pass
try: try:
self.tracker_frame: Frame = self.tracker_sock.recv_pyobj(zmq.NOBLOCK) self.tracker_frame: Frame = self.tracker_sock.recv_pyobj(zmq.NOBLOCK)
new_tracks = True self.update_tracks()
except zmq.ZMQError as e: except zmq.ZMQError as e:
pass pass
if new_tracks:
self.update_tracks()
def update_tracks(self): def update_tracks(self):
"""Updates the track objects and shapes. Called after setting `prediction_frame` """Updates the track objects and shapes. Called after setting `prediction_frame`
@ -454,18 +513,30 @@ class Renderer:
# # TODO fade out # # TODO fade out
# del self.drawn_tracks[track_id] # del self.drawn_tracks[track_id]
if self.tracker_frame:
for track_id, track in self.tracker_frame.tracks.items():
if track_id not in self.drawn_tracks:
self.drawn_tracks[track_id] = DrawnTrack(track_id, track, self, self.tracker_frame.H)
else:
self.drawn_tracks[track_id].set_track(track, self.tracker_frame.H)
# clean up
for track_id in list(self.drawn_tracks.keys()):
# TODO make delay configurable
if self.drawn_tracks[track_id].update_at < time.time() - 5:
# TODO fade out
del self.drawn_tracks[track_id]
def update_predictions(self):
if self.prediction_frame: if self.prediction_frame:
for track_id, track in self.prediction_frame.tracks.items(): for track_id, track in self.prediction_frame.tracks.items():
if track_id not in self.drawn_tracks: if track_id not in self.drawn_tracks:
self.drawn_tracks[track_id] = DrawnTrack(track_id, track, self, self.prediction_frame.H) self.drawn_tracks[track_id] = DrawnTrack(track_id, track, self, self.prediction_frame.H)
logger.warning("Prediction for uninitialised frame. This should not happen? (maybe huge delay in prediction?)")
else: else:
self.drawn_tracks[track_id].set_track(track, self.prediction_frame.H) self.drawn_tracks[track_id].set_prediction(track)
# clean up
for track in self.drawn_tracks.values():
if track.update_at < time.time() - 5:
# TODO fade out
del self.drawn_tracks[track_id]
def on_key_press(self, symbol, modifiers): def on_key_press(self, symbol, modifiers):
print('A key was pressed, use f to hide') print('A key was pressed, use f to hide')
@ -504,9 +575,10 @@ class Renderer:
for shape in track.shapes: for shape in track.shapes:
shape.draw() # for some reason the batches don't work shape.draw() # for some reason the batches don't work
for track in self.drawn_tracks.values(): for track in self.drawn_tracks.values():
for shapes in track.pred_shapes: for prediction in track.predictions:
for shape in shapes: for shapes in prediction.pred_shapes:
shape.draw() for shape in shapes:
shape.draw()
# self.batch_anim.draw() # self.batch_anim.draw()
self.batch_overlay.draw() self.batch_overlay.draw()
@ -668,12 +740,12 @@ def decorate_frame(frame: Frame, prediction_frame: Frame, first_time: float, con
# or https://api.arcade.academy/en/stable/index.html (supports gradient color in line -- "Arcade is built on top of Pyglet and OpenGL.") # or https://api.arcade.academy/en/stable/index.html (supports gradient color in line -- "Arcade is built on top of Pyglet and OpenGL.")
frame.img frame.img
overlay = np.zeros(frame.img.shape, np.uint8) # # Fill image with red color(set each pixel to red)
# Fill image with red color(set each pixel to red) # overlay = np.zeros(frame.img.shape, np.uint8)
overlay[:] = (130, 0, 75) # overlay[:] = (130, 0, 75)
img = cv2.addWeighted(frame.img, .4, overlay, .6, 0) # img = cv2.addWeighted(frame.img, .4, overlay, .6, 0)
# img = frame.img.copy() img = frame.img.copy()
# all not working: # all not working:
# if i == 1: # if i == 1: