Decoupled monitoring and draw iff lines

This commit is contained in:
Ruben van de Ven 2025-05-26 16:14:47 +02:00
parent 010775f5d5
commit 8b9723bdf4
8 changed files with 459 additions and 62 deletions

View file

@ -52,6 +52,7 @@ trap_stage = "trap.stage:Stage.parse_and_start"
trap_prediction = "trap.prediction_server:PredictionServer.parse_and_start"
trap_render_cv = "trap.cv_renderer:CvRenderer.parse_and_start"
trap_monitor = "trap.monitor:Monitor.parse_and_start" # migrate timer
trap_laser_calibration = "trap.laser_calibration:LaserCalibration.parse_and_start" # migrate timer
[tool.uv]

View file

@ -28,7 +28,7 @@ directory=%(here)s
directory=%(here)s
[program:tracker]
command=uv run trap_tracker
command=uv run trap_tracker --smooth-tracks
directory=%(here)s
[program:stage]
@ -40,7 +40,7 @@ command=uv run trap_prediction --eval_device cuda:0 --model_dir EXPERIMENTS/mod
directory=%(here)s
[program:render_cv]
command=uv run trap_render_cv
command=uv run trap_render_cv
directory=%(here)s
environment=DISPLAY=":0"
autostart=false

View file

@ -115,7 +115,7 @@ class CvRenderer(Node):
cv2.namedWindow("frame", cv2.WINDOW_NORMAL)
# https://gist.github.com/ronekko/dc3747211543165108b11073f929b85e
cv2.moveWindow("frame", 1920, -1)
cv2.moveWindow("frame", 0, -1)
if self.config.full_screen:
cv2.setWindowProperty("frame",cv2.WND_PROP_FULLSCREEN,cv2.WINDOW_FULLSCREEN)
# bgsub = cv2.createBackgroundSubtractorMOG2(120, 50, detectShadows=True)
@ -170,10 +170,10 @@ class CvRenderer(Node):
self.out_writer.write(img)
if self.streaming_process:
self.streaming_process.stdin.write(img.tobytes())
if self.config.render_window:
if not self.config.no_window:
cv2.imshow('frame',cv2.resize(img, (1920, 1080)))
# cv2.imshow('frame',img)
cv2.waitKey(1)
cv2.waitKey(10)
# clear out old tracks & predictions:
@ -218,8 +218,8 @@ class CvRenderer(Node):
render_parser.add_argument("--render-file",
help="Render a video file previewing the prediction, and its delay compared to the current frame",
action='store_true')
render_parser.add_argument("--render-window",
help="Render a previewing to a window",
render_parser.add_argument("--no-window",
help="Disable a previewing to a window",
action='store_true')
render_parser.add_argument("--full-screen",

273
trap/laser_calibration.py Normal file
View file

@ -0,0 +1,273 @@
from argparse import ArgumentParser
import json
from pathlib import Path
from typing import Optional
import cv2
import numpy as np
from trap.base import DataclassJSONEncoder, DistortedCamera, Frame
from trap.lines import CoordinateSpace, RenderableLines, SrgbaColor, cross_points
from trap.node import Node
from trap.stage import Coordinate
class LaserCalibration(Node):
"""
A calibrated camera can be used to reverse-map the points of the laser to world coordinates.
Note, it publishes on the address of the stage node, so they cannot run at the same time.
1. Draw points with the laser (use 1-9 to create/select, then position them with arrow keys)
2. Use cursor on camera stream to create an image point for.
- Locate nearby point to select and drag
3. Use image coordinate of point, undistort, homograph, gives world coordinate.
4. Perform homography on world coordinates + laser coordinates
"""
def setup(self):
# self.scenarios: List[DrawnScenario] = []
self.frame_sock = self.sub(self.config.zmq_frame_addr)
self.laser_sock = self.pub(self.config.zmq_stage_addr)
self.camera: Optional[DistortedCamera] = None
self._selected_point = None
self._is_dragging = False
self.laser_points = {}
self.image_points = {}
self.H = None
self.img_size = (1920,1080)
self.frame_img_factor = (1,1)
if self.config.calibfile.exists():
with self.config.calibfile.open('r') as fp:
calibdata = json.load(fp)
self.laser_points = calibdata['laser_points']
self.image_points = calibdata['image_points']
self.H = calibdata['H']
def run(self):
cv2.namedWindow("laser_calib", cv2.WINDOW_NORMAL)
# https://gist.github.com/ronekko/dc3747211543165108b11073f929b85e
# cv2.moveWindow("laser_calib", 0, -1)
cv2.setMouseCallback('laser_calib',self.mouse_event)
cv2.setWindowProperty("laser_calib",cv2.WND_PROP_FULLSCREEN,cv2.WINDOW_FULLSCREEN)
# arrow up (82), down (84), arrow left(81)
frame = None
while self.run_loop_capped_fps(60):
if self.frame_sock.poll(0):
frame: Frame = self.frame_sock.recv_pyobj()
if not self.camera:
self.camera = frame.camera
if frame is None:
continue
self.frame_img_factor = frame.img.shape[1] / self.img_size[0], frame.img.shape[0] / self.img_size[1]
img = frame.img
img = cv2.resize(img, self.img_size)
cv2.putText(img, 'press 1-0 to create/edit points', (10,20), cv2.FONT_HERSHEY_SIMPLEX, .5, (255,255,255))
if len(self.laser_points) < 4:
cv2.putText(img, 'add points to calculate homography', (10,40), cv2.FONT_HERSHEY_SIMPLEX, .5, (255,255,255))
else:
cv2.putText(img, 'press c to calculate homography', (10,40), cv2.FONT_HERSHEY_SIMPLEX, .5, (255,255,0))
cv2.putText(img, str(self.config.calibfile), (10,self.img_size[1]-30), cv2.FONT_HERSHEY_SIMPLEX, .5, (255,255,0))
if self._selected_point:
color = (0,255,255)
cv2.putText(img, f'selected {self._selected_point}', (10,60), cv2.FONT_HERSHEY_SIMPLEX, .5, color)
cv2.putText(img, 'press d to delete', (10,80), cv2.FONT_HERSHEY_SIMPLEX, .5, color)
cv2.putText(img, 'use arrows to position laser for this point', (10,100), cv2.FONT_HERSHEY_SIMPLEX, .5, color)
target = self.camera.points_img_to_world([self.image_points[self._selected_point]])[0].tolist()
target = round(target[0], 2), round(target[1], 2)
cv2.putText(img, f'map {self.laser_points[self._selected_point]} to {target} ({self.image_points[self._selected_point]})', (10,120), cv2.FONT_HERSHEY_SIMPLEX, .5, color)
for k, coord in self.image_points.items():
color = (0,0,255) if self._selected_point == k else (255,0,0)
coord = int(coord[0] / self.frame_img_factor[0]), int(coord[1] / self.frame_img_factor[1])
cv2.circle(img, coord, 4, color, thickness=2)
cv2.putText(img, str(k), (coord[0]+10, coord[1]), cv2.FONT_HERSHEY_SIMPLEX, .5, color)
key = cv2.waitKey(5) # or for arrows: full_key_code = cv2.waitKeyEx(0)
self.key_event(key)
# nr_keys = [ord(i) for i in range(10)] # select/add point
# cv2.
cv2.imshow('laser_calib', img)
lines = []
if self._selected_point:
point = self.laser_points[self._selected_point]
lines.extend(cross_points(point[0], point[1], 100, SrgbaColor(0,1,0,1)))
# render in laser space
rl = RenderableLines(lines, CoordinateSpace.LASER)
self.laser_sock.send_json(rl, cls=DataclassJSONEncoder)
# print(json.dumps(rl, cls=DataclassJSONEncoder))
def key_event(self, key: int):
if key < 0:
return
if key == ord('q'):
exit()
if key == 27: #esc
self._selected_point = None
if key == ord('c'):
self.calculate_homography()
self.save()
if key == ord('d') and self._selected_point:
self.delete_point(self._selected_point)
# arrow up (82), down (84), arrow left(81)
if self._selected_point and key in [81, 84, 82, 83,
ord('h'), ord('j'), ord('k'), ord('l'),
ord('H'), ord('J'), ord('K'), ord('L'),
]:
diff = [0,0]
if key in [81, ord('h')]:
diff[0] -= 1
if key == ord('H'):
diff[0] -= 10
if key in [83, ord('l')]:
diff[0] += 1
if key == ord('L'):
diff[0] += 10
if key in [82, ord('k')]:
diff[1] += 1
if key == ord('K'):
diff[1] += 10
if key in [84, ord('j')]:
diff[1] -= 1
if key == ord('J'):
diff[1] -= 10
self.laser_points[self._selected_point] = (
self.laser_points[self._selected_point][0] + diff[0],
self.laser_points[self._selected_point][1] + diff[1],
)
nr_keys = [ord(str(i)) for i in range(10)]
if key in nr_keys:
select = str(nr_keys.index(key))
self.create_or_select(select)
def mouse_event(self, event,x,y,flags,param):
x *= self.frame_img_factor[0]
y *= self.frame_img_factor[1]
if event == cv2.EVENT_MOUSEMOVE:
if not self._is_dragging or not self._selected_point:
return
self.image_points[self._selected_point] = (x, y)
if event == cv2.EVENT_LBUTTONDOWN:
# select or create
self._selected_point = None
for i, p in self.image_points.items():
d = (p[0]-x)**2 + (p[1]-y)**2
if d < 30:
self._selected_point = i
break
if self._selected_point is None:
self._selected_point = self.new_point((x,y), None)
self._is_dragging = True
if event == cv2.EVENT_LBUTTONUP:
self._is_dragging = False
# ... point stays selected to tweak laser
def create_or_select(self, nr: str):
if nr not in self.image_points:
self.new_point(None, None, nr)
self._selected_point = nr
return nr
def new_point(self, img_coord: Optional[Coordinate], laser_coord: Optional[Coordinate], nr: Optional[str]=None):
if nr:
new_nr = nr
else:
new_nr = None
for i in range(100):
k = str(i)
if k not in self.image_points:
new_nr = k
break
if not new_nr:
new_nr = 0 # cover unlikely case
self.image_points[new_nr] = img_coord or (100,100)
self.laser_points[new_nr] = laser_coord or (100,100)
return new_nr
def delete_point(self, point: str):
del self.image_points[point]
del self.laser_points[point]
self._selected_point = None
def calculate_homography(self):
if len(self.image_points) < 4:
return
world_points = self.camera.points_img_to_world(list(self.image_points.values()))
laser_points = np.array(list(self.laser_points.values()))
print('from', world_points)
print('to', laser_points)
self.H, status = cv2.findHomography(world_points, laser_points)
print('Found')
print(self.H)
def save(self):
with self.config.calibfile.open('w') as fp:
json.dump({
'laser_points': self.laser_points,
'image_points': self.image_points,
'H': self.H.tolist()
}, fp)
@classmethod
def arg_parser(cls) -> ArgumentParser:
argparser = ArgumentParser()
argparser.add_argument('--zmq-frame-addr',
help='Manually specity communication addr for the frame messages',
type=str,
default="ipc:///tmp/feeds_frame")
argparser.add_argument('--zmq-stage-addr',
help='Manually specity communication addr for the stage messages (the rendered lines)',
type=str,
default="tcp://0.0.0.0:99174")
argparser.add_argument('--calibfile',
help='specify file to save & load points with',
type=Path,
default=Path("./laser_calib.json"))
return argparser

View file

@ -1,7 +1,7 @@
from __future__ import annotations
from dataclasses import dataclass
from enum import Enum
from enum import Enum, IntEnum
import math
from typing import List, Tuple
import numpy as np
@ -13,6 +13,12 @@ See [notebook](../test_path_transforms.ipynb) for examples
"""
RenderablePosition = Tuple[float,float]
class CoordinateSpace(IntEnum):
CAMERA = 1
UNDISTORTED_CAMERA = 2
WORLD = 3
LASER = 4
@dataclass
class SrgbaColor():
@ -55,12 +61,12 @@ class SimplifyMethod(Enum):
class RenderableLine():
points: List[RenderablePoint]
def as_simplified(self, method: SimplifyMethod = SimplifyMethod.RDP):
def as_simplified(self, method: SimplifyMethod = SimplifyMethod.RDP, factor = SIMPLIFY_FACTOR_RDP):
linestring = [p.position for p in self.points]
if method == SimplifyMethod.RDP:
indexes = simplify_coords_idx(linestring, SIMPLIFY_FACTOR_RDP)
indexes = simplify_coords_idx(linestring, factor)
elif method == SimplifyMethod.VW:
indexes = simplify_coords_vw_idx(linestring, SIMPLIFY_FACTOR_VW)
indexes = simplify_coords_vw_idx(linestring, factor)
points = [self.points[i] for i in indexes]
return RenderableLine(points)
@ -68,11 +74,12 @@ class RenderableLine():
@dataclass
class RenderableLines():
lines: List[RenderableLine]
space: CoordinateSpace = CoordinateSpace.WORLD
def as_simplified(self, method: SimplifyMethod = SimplifyMethod.RDP):
def as_simplified(self, method: SimplifyMethod = SimplifyMethod.RDP, factor = SIMPLIFY_FACTOR_RDP):
"""Wraps RenderableLine simplification"""
return RenderableLines(
[line.as_simplified(method) for line in self.lines]
[line.as_simplified(method, factor) for line in self.lines]
)
def append(self, rl: RenderableLine):
@ -81,6 +88,9 @@ class RenderableLines():
def append_lines(self, rls: RenderableLines):
self.lines.extend(rls.lines)
def point_count(self):
return sum([len(l.points) for l in self.lines])
# def merge(self, rl: RenderableLines):
@ -91,7 +101,7 @@ def circle_arc(cx, cy, r, t, l, c: SrgbaColor):
for l*2pi, offset by t. Both t and l are 0<= [t,l] <= 1
"""
resolution = 40
resolution = 30
steps = int(resolution * l)
offset = int(resolution * t)
pointlist: list[RenderablePoint] = []
@ -102,4 +112,24 @@ def circle_arc(cx, cy, r, t, l, c: SrgbaColor):
pointlist.append(RenderablePoint((x, y), c))
return RenderableLine(pointlist)
return RenderableLine(pointlist)
def cross_points(cx, cy, r, c: SrgbaColor):
# r = 100
steps = 3
pointlist: list[RenderablePoint] = []
for i in range(steps):
x = int(cx)
y = int(cy + r - i * 2 * r/steps)
pos = (x, y)
pointlist.append(RenderablePoint(pos, c))
path = RenderableLine(pointlist)
pointlist: list[RenderablePoint] = []
for i in range(steps):
y = int(cy)
x = int(cx + r - i * 2 * r/steps)
pos = (x, y)
pointlist.append(RenderablePoint(pos, c))
path2 = RenderableLine(pointlist)
return [path, path2]

65
trap/monitor.py Normal file
View file

@ -0,0 +1,65 @@
from argparse import ArgumentParser
import time
from trap.counter import CounterListerner
from trap.node import Node
class Monitor(Node):
"""
Render a stage, on which different TrackScenarios take place to a
single image of lines. Which can be passed to different renderers
E.g. the laser or image renderers.
"""
FPS = 1
def setup(self):
# self.scenarios: List[DrawnScenario] = []
self.counter_listener = CounterListerner()
def run(self):
prev_time = time.perf_counter()
while self.is_running.is_set():
# self.tick() # don't polute it with own data
self.counter_listener.snapshot()
stats = self.counter_listener.to_string()
if len(stats):
self.logger.info(stats)
# else:
# self.logger.info("no stats")
# for i, (k, v) in enumerate(self.counter_listener.get_latest().items()):
# print(k,v)
# cv2.putText(img, f"{k} {v.value()}", (20,img.shape[0]-(40*i)-40), cv2.FONT_HERSHEY_PLAIN, 1, base_color, 1)
# 3) calculate latency for desired FPS
now = time.perf_counter()
time_diff = (now - prev_time)
if time_diff < 1/self.FPS:
# print(f"sleep {1/self.FPS - time_diff}")
time.sleep(1/self.FPS - time_diff)
now += 1/self.FPS - time_diff
prev_time = now
@classmethod
def arg_parser(cls) -> ArgumentParser:
argparser = ArgumentParser()
# argparser.add_argument('--zmq-trajectory-addr',
# help='Manually specity communication addr for the trajectory messages',
# type=str,
# default="ipc:///tmp/feeds_traj")
# argparser.add_argument('--zmq-prediction-addr',
# help='Manually specity communication addr for the prediction messages',
# type=str,
# default="ipc:///tmp/feeds_preds")
# argparser.add_argument('--zmq-stage-addr',
# help='Manually specity communication addr for the stage messages (the rendered lines)',
# type=str,
# default="tcp://0.0.0.0:99174")
return argparser

View file

@ -3,6 +3,7 @@ from logging.handlers import QueueHandler, QueueListener, SocketHandler
import multiprocessing
from multiprocessing.synchronize import Event as BaseEvent
from argparse import ArgumentParser, Namespace
import time
from typing import Optional
import zmq
@ -19,6 +20,8 @@ class Node():
self.zmq_context = zmq.Context()
self.logger = self._logger()
self._prev_loop_time = 0
self.setup()
@classmethod
@ -43,6 +46,21 @@ class Node():
self.tick()
return self.is_running.is_set()
def run_loop_capped_fps(self, max_fps: float):
"""Use in run(), to check if it should keep looping
Takes care of tick()'ing the iterations/second counter
"""
now = time.perf_counter()
time_diff = (now - self._prev_loop_time)
if time_diff < 1/max_fps:
# print(f"sleep {1/max_fps - time_diff}")
time.sleep(1/max_fps - time_diff)
now += 1/max_fps - time_diff
self._prev_loop_time = now
return self.run_loop()
@classmethod
def arg_parser(cls) -> ArgumentParser:
raise RuntimeError("Not implemented arg_parser()")

View file

@ -24,7 +24,7 @@ from trap import shapes
from trap.base import Camera, DataclassJSONEncoder, DistortedCamera, Frame, ProjectedTrack, Track
from trap.counter import CounterSender
from trap.laser_renderer import circle_points, rotateMatrix
from trap.lines import RenderableLine, RenderableLines, RenderablePoint, RenderablePosition, SrgbaColor, circle_arc
from trap.lines import RenderableLine, RenderableLines, RenderablePoint, RenderablePosition, SimplifyMethod, SrgbaColor, circle_arc
from trap.node import Node
from trap.timer import Timer
from trap.utils import exponentialDecay, exponentialDecayRounded, relativePointToPolar, relativePolarToPoint
@ -56,6 +56,9 @@ class AppendableLine(LineGenerator):
self.ready = len(self.points) == 0
self.draw_decay_speed = draw_decay_speed
def nr_of_passed_points(self):
"""The number of points passed in the animation"""
return len(self._drawn_points) - 1
def update_drawn_positions(self, dt: DeltaT):
if len(self.points) == 0:
@ -164,6 +167,7 @@ class DiffSegment():
Scenario's anomaly score.
"""
DRAW_DECAY_SPEED = 25
POINT_INTERVAL = 4
def __init__(self, prediction: ProjectedTrack):
self.ptrack = prediction
@ -177,6 +181,12 @@ class DiffSegment():
def finish(self):
self.finished = True
def nr_of_passed_points(self):
if isinstance(self.line, AppendableLine):
return self.line.nr_of_passed_points() * self.POINT_INTERVAL
else:
return len(self.points) * self.POINT_INTERVAL
# run on each track update received
def update_track(self, track: ProjectedTrack):
@ -204,7 +214,7 @@ class DiffSegment():
line = []
for i, (p1, p2) in enumerate(zip(trajectory_range, prediction_range)):
offset_from_start = (pred_diff_steps_forward + i)
if offset_from_start % 4 == 0:
if offset_from_start % self.POINT_INTERVAL == 0:
self.line.points.extend([p1, p2])
self.points.extend([p1, p2])
@ -222,33 +232,6 @@ class DiffSegment():
if isinstance(self.line, ProceduralChain):
self.line.target = self._target_track.projected_history[-1]
# if len(self.points) == 0:
# # nothing to draw yet
# return
# # self._drawn_points = self.points
# if len(self._drawn_points) == 0:
# # create origin
# self._drawn_points.append(self.points[0])
# # and drawing head
# self._drawn_points.append(self.points[0])
# idx = len(self._drawn_points) - 1
# target = self.points[idx]
# if np.isclose(self._drawn_points[-1], target, atol=.05).all():
# # TODO: might want to migrate to np.isclose()
# if len(self._drawn_points) == len(self.points):
# self.ready = True
# return # done until a new point is added
# # add new point as drawing head
# self._drawn_points.append(self._drawn_points[-1])
# self.ready = False
# x = exponentialDecayRounded(self._drawn_points[-1][0], target[0], self.DRAW_DECAY_SPEED, dt, .05)
# y = exponentialDecayRounded(self._drawn_points[-1][1], target[1], self.DRAW_DECAY_SPEED, dt, .05)
# self._drawn_points[-1] = (float(x), float(y))
# if not self.finished or not self.line.ready:
self.line.update_drawn_positions(dt)
@ -284,8 +267,8 @@ PREDICTION_FADE_SLOPE: float = -10
PREDICTION_FADE_AFTER_DURATION: float = 10 # seconds
PREDICTION_END_FADE = 2 #frames
# TRACK_MAX_POINTS = 100
TRACK_FADE_AFTER_DURATION = 10. # seconds
TRACK_END_FADE = 50 # points
TRACK_FADE_AFTER_DURATION = 8. # seconds
TRACK_END_FADE = 30 # points
TRACK_FADE_ASSUME_FPS = 12
# Don't render the first n points of the prediction,
@ -309,6 +292,7 @@ class TrackScenario(StateMachine):
receive_prediction = detected.to(first_prediction) | substantial.to(first_prediction) | first_prediction.to(corrected_prediction, cond="prediction_is_stale") | corrected_prediction.to(play, cond="prediction_is_playing")
def __init__(self):
self.track: ProjectedTrack = None
self.camera: Optional[Camera] = None
@ -341,7 +325,7 @@ class TrackScenario(StateMachine):
return False
def check_lost(self):
if self.current_state is not self.lost and self.track and self.track.created_at < time.time() - 5:
if self.current_state is not self.lost and self.track and self.track.updated_at < time.time() - 5:
self.mark_lost()
def set_track(self, track: ProjectedTrack):
@ -464,8 +448,8 @@ class DrawnScenario(TrackScenario):
"""
ANOMALY_DECAY = .2 # speed with which the cirlce shrinks over time
DISTANCE_ANOMALY_FACTOR = .05 # the ammount to which the difference counts to the anomaly score
MAX_HISTORY = 80 # points of history of trajectory to display (preventing too long lines)
DISTANCE_ANOMALY_FACTOR = .03 # the ammount to which the difference counts to the anomaly score
MAX_HISTORY = 100 # points of history of trajectory to display (preventing too long lines)
CUT_GAP = 5 # when adding a new prediction, keep the existing prediction until that point + this CUT_GAP margin
def __init__(self):
@ -525,7 +509,8 @@ class DrawnScenario(TrackScenario):
# 1. track history, direct update
# positions = self._track.get_projected_history(None, self.camera)[-MAX_HISTORY:]
self.drawn_positions = self.track.projected_history[-self.MAX_HISTORY:]
# self.drawn_positions = self.track.projected_history[-self.MAX_HISTORY:]
self.drawn_positions = self.track.projected_history
# 3. predictions
if len(self.drawn_predictions) < len(self.predictions):
@ -533,21 +518,24 @@ class DrawnScenario(TrackScenario):
if len(self.drawn_predictions) == 0:
self.drawn_predictions.append(self.predictions[-1].predictions[0])
else:
# cut existing prediction
end_step = self.predictions[-1].frame_index - self.predictions[-2].frame_index + self.CUT_GAP
# print(end_step)
# if a new prediction has arised, transition from existing one.
# First, cut existing prediction
# CUT_GAP indicates that some is lost in the transition, to prevent glitches when velocity of person changes
end_step = self.predictions[-1].frame_index - self.predictions[-2].frame_index + self.CUT_GAP
keep = self.drawn_predictions[-1][end_step:]
last_item: Coordinate = keep[-1]
self.drawn_predictions[-1] = self.drawn_predictions[-1][:end_step]
self.drawn_predictions[-1] = self.drawn_predictions[-1][:end_step] # cut the old part
# print(self.predictions[-1].frame_index, self.predictions[-2].frame_index, end_step, len(keep))
ext = [last_item] * (len(self.predictions[-1].predictions[0]) - len(keep))
# print(ext)
# duplicate last item, so the new one has the same nr. of points as the incoming prediction (so it can actually transition)
ext = [last_item] * (len(self.predictions[-1].predictions[0]) - len(keep))
keep.extend(ext)
self.drawn_predictions.append(keep)
for a, drawn_prediction in enumerate(self.drawn_predictions):
# origin = self.predictions[a].predictions[0][0]
origin = self.predictions[a].predictions[0][0]
# associated_diff = self.prediction_diffs[a]
# progress = associated_diff.nr_of_passed_points()
for i, pos in enumerate(drawn_prediction):
# TODO: this should be done in polar space starting from origin (i.e. self.drawn_posision[-1])
decay = max(3, (18/i) if i else 10) # points further away move with more delay
@ -624,7 +612,7 @@ class DrawnScenario(TrackScenario):
def to_renderable_lines(self) -> RenderableLines:
t = time.time()
track_age = t - self.track.created_at
track_age = t - self.track.updated_at # Should be beginning
lines = RenderableLines([])
@ -633,8 +621,11 @@ class DrawnScenario(TrackScenario):
# track_max_points = TRACK_FADE_AFTER_DURATION * TRACK_FADE_ASSUME_FPS - track_age_in_frames
# 1. Trajectory history
# drawable_points, alphas = self.drawn_positions[:self.MAX_HISTORY], [1]*len(self.drawn_positions)
drawable_points, alphas = points_fade_out_alpha_mask(self.drawn_positions, track_age, TRACK_FADE_AFTER_DURATION, TRACK_END_FADE)
color = SrgbaColor(1.,0.,0.,1.-self.lost_factor())
points = [RenderablePoint(pos, color.as_faded(a)) for pos, a in zip(drawable_points, alphas)]
lines.append(RenderableLine(points))
@ -642,9 +633,21 @@ class DrawnScenario(TrackScenario):
anomaly_marker_color = SrgbaColor(0.,0.,1, 1.-self.lost_factor()) # fadeout
# lines.append(circle_arc(self.drawn_positions[-1][0], self.drawn_positions[-1][1], 1, t, self.anomaly_score, anomaly_marker_color))
# last point, (but this draws line in circle, requiring a 'jump back' for the laser)
cx, cy = self.drawn_positions[-1][0], self.drawn_positions[-1][1],
radius = max(.1, self._drawn_anomaly_score * 1.)
steps=5
if len(self.drawn_positions) >= steps:
dx, dy = self.drawn_positions[-1][0] - self.drawn_positions[-steps][0], self.drawn_positions[-1][1] - self.drawn_positions[-steps][1],
diff = np.array([dx,dy])
diff = diff/np.linalg.norm(diff) * radius * 1.1
cx += diff[0]
cy += diff[1]
lines.append(circle_arc(
self.drawn_positions[-1][0], self.drawn_positions[-1][1],
max(.1, self._drawn_anomaly_score * 1.),
cx, cy,
radius,
0, 1,
anomaly_marker_color)
)
@ -655,7 +658,11 @@ class DrawnScenario(TrackScenario):
prediction_track_age = time.time() - self.predictions[0].created_at
t_factor = prediction_track_age / PREDICTION_FADE_IN
# positions = [RenderablePosition.from_list(pos) for pos in self.drawn_positions]
for drawn_prediction in self.drawn_predictions:
for a, drawn_prediction in enumerate(self.drawn_predictions):
associated_diff = self.prediction_diffs[a]
progress = associated_diff.nr_of_passed_points()
# drawn_prediction, alphas1 = points_fade_out_alpha_mask(drawn_prediction, prediction_track_age, TRACK_FADE_AFTER_DURATION, TRACK_END_FADE, no_frame_max=True)
@ -674,6 +681,7 @@ class DrawnScenario(TrackScenario):
# points = [RenderablePoint(pos, pos_color) for pos, pos_color in zip(drawn_prediction[PREDICTION_OFFSET:], colors[PREDICTION_OFFSET:])]
points = [RenderablePoint(pos, pos_color) for pos, pos_color in zip(drawn_prediction, colors)]
points = points[progress//2:]
lines.append(RenderableLine(points))
# 4. Diffs
@ -870,10 +878,12 @@ class Stage(Node):
# rl = RenderableLines(lines)
# with open('/tmp/lines.pcl', 'wb') as fp:
# pickle.dump(rl, fp)
rl = lines.as_simplified() # or segmentise (see shapely)
rl = lines.as_simplified(SimplifyMethod.RDP, .01) # or segmentise (see shapely)
self.counter.set("stage.lines", len(lines.lines))
self.counter.set("stage.points_orig", lines.point_count())
self.counter.set("stage.points", rl.point_count())
# print(rl.__dict__)
self.stage_sock.send_json(rl, cls=DataclassJSONEncoder)
self.stage_sock.send_json(obj=rl, cls=DataclassJSONEncoder)
# print(json.dumps(rl, cls=DataclassJSONEncoder))