Compare commits

..

4 commits

Author SHA1 Message Date
Ruben van de Ven
bd00e4fbd6 Calibration tweaks 2025-07-11 14:28:50 +02:00
Ruben van de Ven
0e29371e94 optional camera 2025-07-11 14:28:25 +02:00
Ruben van de Ven
12852c3ae6 no point in autostarting monitor 2025-07-11 14:27:52 +02:00
Ruben van de Ven
f2d7c65a18 noisy squigly lines 2025-07-11 14:27:23 +02:00
7 changed files with 232 additions and 18 deletions

View file

@ -35,6 +35,7 @@ dependencies = [
"simplification>=0.7.12",
"supervisor>=4.2.5",
"superfsmon>=1.2.3",
"noise>=1.2.2",
]
[project.scripts]

View file

@ -20,6 +20,7 @@ serverurl = http://localhost:8293
command=uv run trap_monitor
numprocs=1
directory=%(here)s
autostart=false
[program:video]
command=uv run trap_video_source --homography ../DATASETS/hof3/homography.json --video-src ../DATASETS/hof3/hof3-cam-demo-twoperson.mp4 --calibration ../DATASETS/hof3/calibration.json --video-loop

View file

@ -1,19 +1,24 @@
from argparse import ArgumentParser
import enum
import json
from pathlib import Path
import time
from typing import Optional
import cv2
import numpy as np
from trap.base import DataclassJSONEncoder, DistortedCamera, Frame
from trap.lines import CoordinateSpace, RenderableLines, SrgbaColor, cross_points
from trap.lines import CoordinateSpace, RenderableLine, RenderableLines, RenderablePoint, RenderablePosition, SrgbaColor, cross_points
from trap.node import Node
from trap.stage import Coordinate
class Modes(enum.Enum):
POINTS = 1
TEST_LINE = 2
class LaserCalibration(Node):
"""
@ -39,6 +44,7 @@ class LaserCalibration(Node):
self._is_dragging = False
self.laser_points = {}
self.image_points = {}
self.mode = Modes.POINTS
self.H = None
self.img_size = (1920,1080)
@ -111,13 +117,22 @@ class LaserCalibration(Node):
cv2.imshow('laser_calib', img)
lines = []
if self._selected_point:
point = self.laser_points[self._selected_point]
lines.extend(cross_points(point[0], point[1], 100, SrgbaColor(0,1,0,1)))
if self.mode == Modes.TEST_LINE:
lines.append(RenderableLine([
RenderablePoint((i,time.time()%18), SrgbaColor(0,1,0,1)) for i in range(-15, 40)
]))
# render in laser space
rl = RenderableLines(lines, CoordinateSpace.WORLD)
self.laser_sock.send_json(rl, cls=DataclassJSONEncoder)
else:
if self._selected_point:
point = self.laser_points[self._selected_point]
lines.extend(cross_points(point[0], point[1], 100, SrgbaColor(0,1,0,1)))
# render in laser space
rl = RenderableLines(lines, CoordinateSpace.LASER)
self.laser_sock.send_json(rl, cls=DataclassJSONEncoder)
# render in laser space
rl = RenderableLines(lines, CoordinateSpace.LASER)
self.laser_sock.send_json(rl, cls=DataclassJSONEncoder)
# print(json.dumps(rl, cls=DataclassJSONEncoder))
@ -138,6 +153,10 @@ class LaserCalibration(Node):
if key == ord('d') and self._selected_point:
self.delete_point(self._selected_point)
if key == ord('t'):
self.mode = Modes.TEST_LINE if self.mode == Modes.POINTS else Modes.POINTS
print(self.mode)
# arrow up (82), down (84), arrow left(81)
if self._selected_point and key in [81, 84, 82, 83,

View file

@ -77,7 +77,7 @@ class RenderableLines():
space: CoordinateSpace = CoordinateSpace.WORLD
def as_simplified(self, method: SimplifyMethod = SimplifyMethod.RDP, factor = SIMPLIFY_FACTOR_RDP):
"""Wraps RenderableLine simplification"""
"""Wraps RenderableLine simplification, smaller factor is more detailed"""
return RenderableLines(
[line.as_simplified(method, factor) for line in self.lines]
)

View file

@ -29,12 +29,16 @@ from trap.node import Node
from trap.timer import Timer
from trap.utils import exponentialDecay, exponentialDecayRounded, relativePointToPolar, relativePolarToPoint
from noise import snoise2
logger = logging.getLogger('trap.stage')
Coordinate = Tuple[float, float]
DeltaT = float # delta_t in seconds
OPTION_GROW_ANOMALY_CIRCLE = False
OPTION_RENDER_DIFF_SEGMENT = False
class LineGenerator(ABC):
@abstractmethod
def update_drawn_positions(self, dt: DeltaT):
@ -252,6 +256,69 @@ class DiffSegment():
return RenderableLines([])
class DiffSegmentScan(DiffSegment):
"""
Provide alternative diffing, in the form of a sort of scan line
Should be faster with the laser
TODO: This is work in progress, does not work yet!
"""
def __init__(self, prediction: ProjectedTrack):
self.ptrack = prediction
self._target_track = prediction
self.finished = False
self._last_diff_frame_idx = 0
def finish(self):
self.finished = True
def prediction_offset(self):
"""Difference is starting moment between track and prediction"""
return self.ptrack.frame_index - self._target_track.frame_index
def nr_of_passed_points(self):
"""Number of points of the given ptrack that have passed"""
return len(self._target_track.projected_history) - 1 - self.prediction_offset()
# len(self.points) * self.POINT_INTERVAL
# run on each track update received
def update_track(self, track: ProjectedTrack):
self._target_track = track
if self.finished:
# don't add new points if finished
return
start_frame_idx = max(self.ptrack.frame_index, self._last_diff_frame_idx)
traj_diff_steps_back = track.frame_index - start_frame_idx # positive value
pred_diff_steps_forward = start_frame_idx - self.ptrack.frame_index # positive value
self._last_diff_frame_idx = track.frame_index
# run each render tick
def update_drawn_positions(self, dt: DeltaT, scenario: DrawnScenario):
# if not self.finished or not self.line.ready:
# self.line.update_drawn_positions(dt)
pass # TODO: use easing
def as_renderable(self) -> RenderableLines:
if self.finished:
return RenderableLines([])
color = SrgbaColor(0,0,1,1)
# steps_diff = self.nr_of_passed_points()
idx = self.nr_of_passed_points()
if len(self.ptrack.predictions[0]) < idx+1:
self.finish()
return RenderableLines([])
points = [self._target_track.projected_history[-1], self.ptrack.predictions[0][idx]]
points = [RenderablePoint(pos, color) for pos in points]
line = RenderableLine(points)
return RenderableLines([line])
class ScenarioScene(Enum):
DETECTED = 1
FIRST_PREDICTION = 2
@ -367,6 +434,10 @@ class TrackScenario(StateMachine):
if PREDICTION_INTERVAL is not None and len(self.predictions) and (track.frame_index - self.predictions[-1].frame_index) < PREDICTION_INTERVAL:
# just drop tracks if the predictions come to quick
return
if track._track.predictions is None or not len(track._track.predictions):
# don't count to predictions if no prediction is set of given track (e.g. young tracks)
return
self.predictions.append(track)
@ -374,6 +445,7 @@ class TrackScenario(StateMachine):
self.prediction_diffs[-1].finish() # existing diffing can end
# and create a new one
self.prediction_diffs.append(DiffSegment(track))
# self.prediction_diffs.append(DiffSegmentScan(track))
# check to change state
try:
@ -457,6 +529,7 @@ class DrawnScenario(TrackScenario):
# self.track_id = track_id
self.last_update_t = time.perf_counter()
self.drawn_position: Optional[Coordinate] = None
self.drawn_positions: List[Coordinate] = []
self.drawn_pred_history: List[Coordinate] = []
self.drawn_predictions: List[List[Coordinate]] = []
@ -511,19 +584,25 @@ class DrawnScenario(TrackScenario):
# positions = self._track.get_projected_history(None, self.camera)[-MAX_HISTORY:]
# self.drawn_positions = self.track.projected_history[-self.MAX_HISTORY:]
self.drawn_positions = self.track.projected_history
if self.drawn_position is None:
self.drawn_position = self.drawn_positions[-1]
else:
self.drawn_position[0] = exponentialDecay(self.drawn_position[0], self.drawn_positions[-1][0], 3, dt)
self.drawn_position[1] = exponentialDecay(self.drawn_position[1], self.drawn_positions[-1][1], 3, dt)
# 3. predictions
if len(self.drawn_predictions) < len(self.predictions):
# first prediction
if len(self.drawn_predictions) == 0:
self.drawn_predictions.append(self.predictions[-1].predictions[0])
last_pred = self.predictions[-1]
self.drawn_predictions.append(last_pred.predictions[0])
else:
# if a new prediction has arised, transition from existing one.
# First, cut existing prediction
# CUT_GAP indicates that some is lost in the transition, to prevent glitches when velocity of person changes
end_step = self.predictions[-1].frame_index - self.predictions[-2].frame_index + self.CUT_GAP
keep = self.drawn_predictions[-1][end_step:]
last_item: Coordinate = keep[-1]
last_item: Coordinate = (keep)[-1]
self.drawn_predictions[-1] = self.drawn_predictions[-1][:end_step] # cut the old part
# print(self.predictions[-1].frame_index, self.predictions[-2].frame_index, end_step, len(keep))
# duplicate last item, so the new one has the same nr. of points as the incoming prediction (so it can actually transition)
@ -622,11 +701,21 @@ class DrawnScenario(TrackScenario):
# 1. Trajectory history
# drawable_points, alphas = self.drawn_positions[:self.MAX_HISTORY], [1]*len(self.drawn_positions)
# perlin/simplex noise
# dt: change speed. Divide to make slower
# amp: amplitude of noise
# frequency: make smaller to make longer waves
noisy_points = apply_perlin_noise_to_line_normal(self.drawn_positions, t/3, .3, .05)
drawable_points, alphas = points_fade_out_alpha_mask(noisy_points, track_age, TRACK_FADE_AFTER_DURATION, TRACK_END_FADE)
color = SrgbaColor(1.,0.,1.,1.-self.lost_factor())
drawable_points, alphas = points_fade_out_alpha_mask(self.drawn_positions, track_age, TRACK_FADE_AFTER_DURATION, TRACK_END_FADE)
color = SrgbaColor(1.,0.,0.,1.-self.lost_factor())
# TODO: effect configuration
points = [RenderablePoint(pos, color.as_faded(a)) for pos, a in zip(drawable_points, alphas)]
# points = [RenderablePoint(pos, color.as_faded(a)) for pos, a in zip(drawable_points, alphas)]
lines.append(RenderableLine(points))
# 2. Position Marker / anomaly score
@ -635,7 +724,8 @@ class DrawnScenario(TrackScenario):
# lines.append(circle_arc(self.drawn_positions[-1][0], self.drawn_positions[-1][1], 1, t, self.anomaly_score, anomaly_marker_color))
# last point, (but this draws line in circle, requiring a 'jump back' for the laser)
cx, cy = self.drawn_positions[-1][0], self.drawn_positions[-1][1],
radius = max(.1, self._drawn_anomaly_score * 1.)
radius = max(.1, self._drawn_anomaly_score * 1.) if OPTION_GROW_ANOMALY_CIRCLE else .1
steps=5
if len(self.drawn_positions) >= steps:
@ -690,8 +780,10 @@ class DrawnScenario(TrackScenario):
# colors = [color.as_faded(1) for a2 in range(len(drawn_diff))]
# points = [RenderablePoint(pos, pos_color) for pos, pos_color in zip(drawn_diff, colors)]
# lines.append(RenderableLine(points))
for diff in self.prediction_diffs:
lines.append_lines(diff.as_renderable())
if OPTION_RENDER_DIFF_SEGMENT:
for diff in self.prediction_diffs:
lines.append_lines(diff.as_renderable())
# # print(self.current_state)
@ -878,7 +970,7 @@ class Stage(Node):
# rl = RenderableLines(lines)
# with open('/tmp/lines.pcl', 'wb') as fp:
# pickle.dump(rl, fp)
rl = lines.as_simplified(SimplifyMethod.RDP, .01) # or segmentise (see shapely)
rl = lines.as_simplified(SimplifyMethod.RDP, .003) # or segmentise (see shapely)
self.counter.set("stage.lines", len(lines.lines))
self.counter.set("stage.points_orig", lines.point_count())
self.counter.set("stage.points", rl.point_count())
@ -905,3 +997,89 @@ class Stage(Node):
return argparser
# TODO place somewhere else:
# Gemma3:27b prompt: "python. Given a list of coordinates, that describes a line: `drawable_points: List[Tuple[float,float]]` apply perlin noise over the normal of the line, that changes over time `dt`."
def apply_perlin_noise_to_line_normal(drawable_points: np.ndarray, dt: float, amplitude: float = 1.0, frequency: float = 1.0) -> np.ndarray:
"""
Applies Perlin noise to the normals of a line described by a list of coordinates, changing over time.
Args:
drawable_points: A list of (x, y) tuples representing the points of the line.
dt: The time delta, used to animate the Perlin noise.
amplitude: The strength of the Perlin noise effect.
frequency: The frequency of the Perlin noise (how many waves per unit).
Returns:
A new list of (x, y) tuples representing the line with Perlin noise applied to the normals. If drawable_points
has fewer than 2 points, it returns the original list unchanged.
Raises:
TypeError: If drawable_points is not a list or dt is not a float.
ValueError: If the input points are not tuples of length 2.
"""
# if not isinstance(drawable_points, list):
# print(drawable_points, type(drawable_points))
# raise TypeError("drawable_points must be a list.")
if not isinstance(dt, float):
raise TypeError("dt must be a float.")
if len(drawable_points) < 2:
return drawable_points # Nothing to do with fewer than 2 points
# for point in drawable_points:
# if not isinstance(point, tuple) or len(point) != 2:
# raise ValueError("Each point in drawable_points must be a tuple of length 2.")
# noise = PerlinNoise(octaves=4) # You can adjust octaves for different noise patterns
new_points = []
for i in range(len(drawable_points)):
x, y = drawable_points[i]
# Calculate the normal vector. We'll approximate it using the previous and next points.
if i == 0:
# For the first point, use the next point to estimate the normal
next_x, next_y = drawable_points[i + 1]
normal_x = next_y - y
normal_y = -(next_x - x)
elif i == len(drawable_points) - 1:
# For the last point, use the previous point
prev_x, prev_y = drawable_points[i - 1]
normal_x = y - prev_y
normal_y = -(x - prev_x)
else:
prev_x, prev_y = drawable_points[i - 1]
next_x, next_y = drawable_points[i + 1]
normal_x = next_y - prev_y
normal_y = -(next_x - prev_x)
# Normalize the normal vector
norm = np.sqrt(normal_x**2 + normal_y**2)
if norm > 0:
normal_x /= norm
normal_y /= norm
# Apply Perlin noise to the normal
# noise_x = noise([x * frequency, (y + dt) * frequency]) * amplitude * normal_x
# noise_y = noise([x * frequency, (y + dt) * frequency]) * amplitude * normal_y
noise = snoise2(i * frequency, dt % 1000, octaves=4)
noise_x = noise * amplitude * normal_x
noise_y = noise * amplitude * normal_y
# print(noise_x, noise_y, dt, frequency, i, dt, snoise2(i * frequency, dt % 1000, octaves=4))
# Add the noise to the point's coordinates
new_x = x + noise_x
new_y = y + noise_y
new_points.append((new_x, new_y))
# print(drawable_points, new_points)
return np.array(new_points)

View file

@ -124,7 +124,10 @@ class SingleCvVideoSource(VideoSource):
class RtspSource(SingleCvVideoSource):
def __init__(self, video_url: str | Path, camera: Camera = None):
gst = f"rtspsrc location={video_url} latency=0 buffer-mode=auto ! decodebin ! videoconvert ! appsink max-buffers=0 drop=true"
# keep max 1 frame in app-buffer (0 = unlimited)
# When using gstreamer 1.28 drop=true is deprecated, use: leaky-type=2 which frame to drop: https://gstreamer.freedesktop.org/documentation/applib/gstappsrc.html?gi-language=c
gst = f"rtspsrc location={video_url} latency=0 buffer-mode=auto ! decodebin ! videoconvert ! appsink max-buffers=1 drop=true"
logger.info(f"Capture gstreamer (gst-launch-1.0): {gst}")
self.video = cv2.VideoCapture(gst, cv2.CAP_GSTREAMER)
self.frame_idx = 0
@ -209,7 +212,7 @@ class CameraSource(SingleCvVideoSource):
self.video.set(cv2.CAP_PROP_FPS, self.camera.fps)
self.frame_idx = 0
def get_video_source(video_sources: List[UrlOrPath], camera: Camera, frame_offset=0, frame_end:Optional[int]=None, loop=False):
def get_video_source(video_sources: List[UrlOrPath], camera: Optional[Camera] = None, frame_offset=0, frame_end:Optional[int]=None, loop=False):
if str(video_sources[0]).isdigit():
# numeric input is a CV camera
@ -230,3 +233,7 @@ def get_video_source(video_sources: List[UrlOrPath], camera: Camera, frame_offse
return FilelistSource(video_sources, offset = frame_offset, end=frame_end, loop=loop)
# os.environ["OPENCV_FFMPEG_CAPTURE_OPTIONS"] = "fflags;nobuffer|flags;low_delay|avioflags;direct|rtsp_transport;udp"
def get_video_source_from_str(video_sources: List[str]):
paths = [UrlOrPath(s) for s in video_sources]
return get_video_source(paths)

View file

@ -1300,6 +1300,12 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/2e/5e/cb3dbdf3ae18e281b8b1b4691bb5d3465b383e04bde2c2a782c893f1ee21/nicegui-2.13.0-py3-none-any.whl", hash = "sha256:2343d37885df2c2e388a4f4c3f0ce9b308be02e16b0303108471a1a38fe3508f", size = 16482500 },
]
[[package]]
name = "noise"
version = "1.2.2"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/18/29/bb830ee6d934311e17a7a4fa1368faf3e73fbb09c0d80fc44e41828df177/noise-1.2.2.tar.gz", hash = "sha256:57a2797436574391ff63a111e852e53a4164ecd81ad23639641743cd1a209b65", size = 125615 }
[[package]]
name = "notebook"
version = "7.3.3"
@ -2522,6 +2528,7 @@ dependencies = [
{ name = "gdown" },
{ name = "ipywidgets" },
{ name = "jsonlines" },
{ name = "noise" },
{ name = "opencv-python" },
{ name = "pandas-helper-calc" },
{ name = "pyglet" },
@ -2556,6 +2563,7 @@ requires-dist = [
{ name = "gdown", specifier = ">=4.7.1,<5" },
{ name = "ipywidgets", specifier = ">=8.1.5,<9" },
{ name = "jsonlines", specifier = ">=4.0.0,<5" },
{ name = "noise", specifier = ">=1.2.2" },
{ name = "opencv-python", path = "opencv_python-4.10.0.84-cp310-cp310-linux_x86_64.whl" },
{ name = "pandas-helper-calc", git = "https://github.com/scls19fr/pandas-helper-calc" },
{ name = "pyglet", specifier = ">=2.0.15,<3" },