noisy squigly lines

This commit is contained in:
Ruben van de Ven 2025-07-11 14:27:23 +02:00
parent 53e0c464c8
commit f2d7c65a18
3 changed files with 195 additions and 8 deletions

View file

@ -35,6 +35,7 @@ dependencies = [
"simplification>=0.7.12",
"supervisor>=4.2.5",
"superfsmon>=1.2.3",
"noise>=1.2.2",
]
[project.scripts]

View file

@ -29,12 +29,16 @@ from trap.node import Node
from trap.timer import Timer
from trap.utils import exponentialDecay, exponentialDecayRounded, relativePointToPolar, relativePolarToPoint
from noise import snoise2
logger = logging.getLogger('trap.stage')
Coordinate = Tuple[float, float]
DeltaT = float # delta_t in seconds
OPTION_GROW_ANOMALY_CIRCLE = False
OPTION_RENDER_DIFF_SEGMENT = False
class LineGenerator(ABC):
@abstractmethod
def update_drawn_positions(self, dt: DeltaT):
@ -252,6 +256,69 @@ class DiffSegment():
return RenderableLines([])
class DiffSegmentScan(DiffSegment):
"""
Provide alternative diffing, in the form of a sort of scan line
Should be faster with the laser
TODO: This is work in progress, does not work yet!
"""
def __init__(self, prediction: ProjectedTrack):
self.ptrack = prediction
self._target_track = prediction
self.finished = False
self._last_diff_frame_idx = 0
def finish(self):
self.finished = True
def prediction_offset(self):
"""Difference is starting moment between track and prediction"""
return self.ptrack.frame_index - self._target_track.frame_index
def nr_of_passed_points(self):
"""Number of points of the given ptrack that have passed"""
return len(self._target_track.projected_history) - 1 - self.prediction_offset()
# len(self.points) * self.POINT_INTERVAL
# run on each track update received
def update_track(self, track: ProjectedTrack):
self._target_track = track
if self.finished:
# don't add new points if finished
return
start_frame_idx = max(self.ptrack.frame_index, self._last_diff_frame_idx)
traj_diff_steps_back = track.frame_index - start_frame_idx # positive value
pred_diff_steps_forward = start_frame_idx - self.ptrack.frame_index # positive value
self._last_diff_frame_idx = track.frame_index
# run each render tick
def update_drawn_positions(self, dt: DeltaT, scenario: DrawnScenario):
# if not self.finished or not self.line.ready:
# self.line.update_drawn_positions(dt)
pass # TODO: use easing
def as_renderable(self) -> RenderableLines:
if self.finished:
return RenderableLines([])
color = SrgbaColor(0,0,1,1)
# steps_diff = self.nr_of_passed_points()
idx = self.nr_of_passed_points()
if len(self.ptrack.predictions[0]) < idx+1:
self.finish()
return RenderableLines([])
points = [self._target_track.projected_history[-1], self.ptrack.predictions[0][idx]]
points = [RenderablePoint(pos, color) for pos in points]
line = RenderableLine(points)
return RenderableLines([line])
class ScenarioScene(Enum):
DETECTED = 1
FIRST_PREDICTION = 2
@ -368,12 +435,17 @@ class TrackScenario(StateMachine):
# just drop tracks if the predictions come to quick
return
if track._track.predictions is None or not len(track._track.predictions):
# don't count to predictions if no prediction is set of given track (e.g. young tracks)
return
self.predictions.append(track)
if len(self.prediction_diffs):
self.prediction_diffs[-1].finish() # existing diffing can end
# and create a new one
self.prediction_diffs.append(DiffSegment(track))
# self.prediction_diffs.append(DiffSegmentScan(track))
# check to change state
try:
@ -457,6 +529,7 @@ class DrawnScenario(TrackScenario):
# self.track_id = track_id
self.last_update_t = time.perf_counter()
self.drawn_position: Optional[Coordinate] = None
self.drawn_positions: List[Coordinate] = []
self.drawn_pred_history: List[Coordinate] = []
self.drawn_predictions: List[List[Coordinate]] = []
@ -511,19 +584,25 @@ class DrawnScenario(TrackScenario):
# positions = self._track.get_projected_history(None, self.camera)[-MAX_HISTORY:]
# self.drawn_positions = self.track.projected_history[-self.MAX_HISTORY:]
self.drawn_positions = self.track.projected_history
if self.drawn_position is None:
self.drawn_position = self.drawn_positions[-1]
else:
self.drawn_position[0] = exponentialDecay(self.drawn_position[0], self.drawn_positions[-1][0], 3, dt)
self.drawn_position[1] = exponentialDecay(self.drawn_position[1], self.drawn_positions[-1][1], 3, dt)
# 3. predictions
if len(self.drawn_predictions) < len(self.predictions):
# first prediction
if len(self.drawn_predictions) == 0:
self.drawn_predictions.append(self.predictions[-1].predictions[0])
last_pred = self.predictions[-1]
self.drawn_predictions.append(last_pred.predictions[0])
else:
# if a new prediction has arised, transition from existing one.
# First, cut existing prediction
# CUT_GAP indicates that some is lost in the transition, to prevent glitches when velocity of person changes
end_step = self.predictions[-1].frame_index - self.predictions[-2].frame_index + self.CUT_GAP
keep = self.drawn_predictions[-1][end_step:]
last_item: Coordinate = keep[-1]
last_item: Coordinate = (keep)[-1]
self.drawn_predictions[-1] = self.drawn_predictions[-1][:end_step] # cut the old part
# print(self.predictions[-1].frame_index, self.predictions[-2].frame_index, end_step, len(keep))
# duplicate last item, so the new one has the same nr. of points as the incoming prediction (so it can actually transition)
@ -623,10 +702,20 @@ class DrawnScenario(TrackScenario):
# 1. Trajectory history
# drawable_points, alphas = self.drawn_positions[:self.MAX_HISTORY], [1]*len(self.drawn_positions)
drawable_points, alphas = points_fade_out_alpha_mask(self.drawn_positions, track_age, TRACK_FADE_AFTER_DURATION, TRACK_END_FADE)
color = SrgbaColor(1.,0.,0.,1.-self.lost_factor())
# perlin/simplex noise
# dt: change speed. Divide to make slower
# amp: amplitude of noise
# frequency: make smaller to make longer waves
noisy_points = apply_perlin_noise_to_line_normal(self.drawn_positions, t/3, .3, .05)
drawable_points, alphas = points_fade_out_alpha_mask(noisy_points, track_age, TRACK_FADE_AFTER_DURATION, TRACK_END_FADE)
color = SrgbaColor(1.,0.,1.,1.-self.lost_factor())
# TODO: effect configuration
points = [RenderablePoint(pos, color.as_faded(a)) for pos, a in zip(drawable_points, alphas)]
# points = [RenderablePoint(pos, color.as_faded(a)) for pos, a in zip(drawable_points, alphas)]
lines.append(RenderableLine(points))
# 2. Position Marker / anomaly score
@ -635,7 +724,8 @@ class DrawnScenario(TrackScenario):
# lines.append(circle_arc(self.drawn_positions[-1][0], self.drawn_positions[-1][1], 1, t, self.anomaly_score, anomaly_marker_color))
# last point, (but this draws line in circle, requiring a 'jump back' for the laser)
cx, cy = self.drawn_positions[-1][0], self.drawn_positions[-1][1],
radius = max(.1, self._drawn_anomaly_score * 1.)
radius = max(.1, self._drawn_anomaly_score * 1.) if OPTION_GROW_ANOMALY_CIRCLE else .1
steps=5
if len(self.drawn_positions) >= steps:
@ -690,6 +780,8 @@ class DrawnScenario(TrackScenario):
# colors = [color.as_faded(1) for a2 in range(len(drawn_diff))]
# points = [RenderablePoint(pos, pos_color) for pos, pos_color in zip(drawn_diff, colors)]
# lines.append(RenderableLine(points))
if OPTION_RENDER_DIFF_SEGMENT:
for diff in self.prediction_diffs:
lines.append_lines(diff.as_renderable())
@ -878,7 +970,7 @@ class Stage(Node):
# rl = RenderableLines(lines)
# with open('/tmp/lines.pcl', 'wb') as fp:
# pickle.dump(rl, fp)
rl = lines.as_simplified(SimplifyMethod.RDP, .01) # or segmentise (see shapely)
rl = lines.as_simplified(SimplifyMethod.RDP, .003) # or segmentise (see shapely)
self.counter.set("stage.lines", len(lines.lines))
self.counter.set("stage.points_orig", lines.point_count())
self.counter.set("stage.points", rl.point_count())
@ -905,3 +997,89 @@ class Stage(Node):
return argparser
# TODO place somewhere else:
# Gemma3:27b prompt: "python. Given a list of coordinates, that describes a line: `drawable_points: List[Tuple[float,float]]` apply perlin noise over the normal of the line, that changes over time `dt`."
def apply_perlin_noise_to_line_normal(drawable_points: np.ndarray, dt: float, amplitude: float = 1.0, frequency: float = 1.0) -> np.ndarray:
"""
Applies Perlin noise to the normals of a line described by a list of coordinates, changing over time.
Args:
drawable_points: A list of (x, y) tuples representing the points of the line.
dt: The time delta, used to animate the Perlin noise.
amplitude: The strength of the Perlin noise effect.
frequency: The frequency of the Perlin noise (how many waves per unit).
Returns:
A new list of (x, y) tuples representing the line with Perlin noise applied to the normals. If drawable_points
has fewer than 2 points, it returns the original list unchanged.
Raises:
TypeError: If drawable_points is not a list or dt is not a float.
ValueError: If the input points are not tuples of length 2.
"""
# if not isinstance(drawable_points, list):
# print(drawable_points, type(drawable_points))
# raise TypeError("drawable_points must be a list.")
if not isinstance(dt, float):
raise TypeError("dt must be a float.")
if len(drawable_points) < 2:
return drawable_points # Nothing to do with fewer than 2 points
# for point in drawable_points:
# if not isinstance(point, tuple) or len(point) != 2:
# raise ValueError("Each point in drawable_points must be a tuple of length 2.")
# noise = PerlinNoise(octaves=4) # You can adjust octaves for different noise patterns
new_points = []
for i in range(len(drawable_points)):
x, y = drawable_points[i]
# Calculate the normal vector. We'll approximate it using the previous and next points.
if i == 0:
# For the first point, use the next point to estimate the normal
next_x, next_y = drawable_points[i + 1]
normal_x = next_y - y
normal_y = -(next_x - x)
elif i == len(drawable_points) - 1:
# For the last point, use the previous point
prev_x, prev_y = drawable_points[i - 1]
normal_x = y - prev_y
normal_y = -(x - prev_x)
else:
prev_x, prev_y = drawable_points[i - 1]
next_x, next_y = drawable_points[i + 1]
normal_x = next_y - prev_y
normal_y = -(next_x - prev_x)
# Normalize the normal vector
norm = np.sqrt(normal_x**2 + normal_y**2)
if norm > 0:
normal_x /= norm
normal_y /= norm
# Apply Perlin noise to the normal
# noise_x = noise([x * frequency, (y + dt) * frequency]) * amplitude * normal_x
# noise_y = noise([x * frequency, (y + dt) * frequency]) * amplitude * normal_y
noise = snoise2(i * frequency, dt % 1000, octaves=4)
noise_x = noise * amplitude * normal_x
noise_y = noise * amplitude * normal_y
# print(noise_x, noise_y, dt, frequency, i, dt, snoise2(i * frequency, dt % 1000, octaves=4))
# Add the noise to the point's coordinates
new_x = x + noise_x
new_y = y + noise_y
new_points.append((new_x, new_y))
# print(drawable_points, new_points)
return np.array(new_points)

View file

@ -1300,6 +1300,12 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/2e/5e/cb3dbdf3ae18e281b8b1b4691bb5d3465b383e04bde2c2a782c893f1ee21/nicegui-2.13.0-py3-none-any.whl", hash = "sha256:2343d37885df2c2e388a4f4c3f0ce9b308be02e16b0303108471a1a38fe3508f", size = 16482500 },
]
[[package]]
name = "noise"
version = "1.2.2"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/18/29/bb830ee6d934311e17a7a4fa1368faf3e73fbb09c0d80fc44e41828df177/noise-1.2.2.tar.gz", hash = "sha256:57a2797436574391ff63a111e852e53a4164ecd81ad23639641743cd1a209b65", size = 125615 }
[[package]]
name = "notebook"
version = "7.3.3"
@ -2522,6 +2528,7 @@ dependencies = [
{ name = "gdown" },
{ name = "ipywidgets" },
{ name = "jsonlines" },
{ name = "noise" },
{ name = "opencv-python" },
{ name = "pandas-helper-calc" },
{ name = "pyglet" },
@ -2556,6 +2563,7 @@ requires-dist = [
{ name = "gdown", specifier = ">=4.7.1,<5" },
{ name = "ipywidgets", specifier = ">=8.1.5,<9" },
{ name = "jsonlines", specifier = ">=4.0.0,<5" },
{ name = "noise", specifier = ">=1.2.2" },
{ name = "opencv-python", path = "opencv_python-4.10.0.84-cp310-cp310-linux_x86_64.whl" },
{ name = "pandas-helper-calc", git = "https://github.com/scls19fr/pandas-helper-calc" },
{ name = "pyglet", specifier = ">=2.0.15,<3" },