accumulate anomaly score

This commit is contained in:
Ruben van de Ven 2025-05-16 13:43:45 +02:00
parent ad7328f7c0
commit 55d86eab45

View file

@ -89,17 +89,28 @@ class AppendableLine(LineGenerator):
class ProceduralChain(LineGenerator): class ProceduralChain(LineGenerator):
MOVE_DECAY_SPEED = 50 MOVE_DECAY_SPEED = 50
VELOCITY_DAMPING = 10
VELOCITY_FACTOR = 2
link_size = .1 # 10cm link_size = .1 # 10cm
# angle_constraint = 5 # angle_constraint = 5
def __init__(self, joints: List[Coordinate]): def __init__(self, joints: List[Coordinate], scenario: DrawnScenario, use_velocity = False):
self.joints: List[Coordinate] = joints self.joints: List[Coordinate] = joints
self.target: Coordinate = joints[-1] self.target: Coordinate = joints[-1]
self.ready = False self.ready = False
self.move_decay_speed = self.MOVE_DECAY_SPEED self.move_decay_speed = self.MOVE_DECAY_SPEED
self.scenario = scenario
self.use_velocity = use_velocity
if self.use_velocity:
if len(self.joints) > 1:
self.v = np.array(self.joints[-2]) - np.array(self.joints[-1])
self.v /= np.linalg.norm(self.v) / 10
else:
self.v = np.array([0,0])
@classmethod @classmethod
def from_appendable_line(cls, al: AppendableLine) -> ProceduralChain: def from_appendable_line(cls, al: AppendableLine, scenario: DrawnScenario) -> ProceduralChain:
# TODO: create more segments: # TODO: create more segments:
# last added points becomes the head of the chain # last added points becomes the head of the chain
points = list(reversed(al.points)) points = list(reversed(al.points))
@ -107,7 +118,7 @@ class ProceduralChain(LineGenerator):
linestring = linestring.segmentize(cls.link_size) linestring = linestring.segmentize(cls.link_size)
joints = list(linestring.coords) joints = list(linestring.coords)
return cls(joints) return cls(joints, scenario)
def update_drawn_positions(self, dt: DeltaT): def update_drawn_positions(self, dt: DeltaT):
if self.ready: if self.ready:
@ -115,33 +126,32 @@ class ProceduralChain(LineGenerator):
# direction = np.array(self.joints[-1] - self.target) # direction = np.array(self.joints[-1] - self.target)
# TODO: check self.joints empty, and stop then # TODO: check self.joints empty, and stop then
if self.use_velocity:
vx = exponentialDecayRounded(self.v[0], self.target[0] - self.joints[0][0], self.VELOCITY_DAMPING, dt, .05)
vy = exponentialDecayRounded(self.v[1], self.target[1] - self.joints[0][1], self.VELOCITY_DAMPING, dt, .05)
self.v = np.array([vx, vy])
self.joints[0] = (float(self.joints[0][0] + self.v[0] * dt * self.VELOCITY_FACTOR), float(self.joints[0][1] + self.v[1] * dt * self.VELOCITY_FACTOR))
else:
x = exponentialDecayRounded(self.joints[0][0], self.target[0], self.move_decay_speed, dt, .05) x = exponentialDecayRounded(self.joints[0][0], self.target[0], self.move_decay_speed, dt, .05)
y = exponentialDecayRounded(self.joints[0][1], self.target[1], self.move_decay_speed, dt, .05) y = exponentialDecayRounded(self.joints[0][1], self.target[1], self.move_decay_speed, dt, .05)
self.joints[0] = (float(x), float(y)) self.joints[0] = (float(x), float(y))
# Loop inspired by: https://github.com/argonautcode/animal-proc-anim/blob/main/Chain.pde
# see that code for angle constrains.
for i, (joint, prev_joint) in enumerate(zip(self.joints[1:], self.joints), start=1): for i, (joint, prev_joint) in enumerate(zip(self.joints[1:], self.joints), start=1):
diff = np.array(prev_joint) - np.array(joint) diff = np.array(prev_joint) - np.array(joint)
direction = diff / np.linalg.norm(diff) direction = diff / np.linalg.norm(diff)
self.joints[i] = prev_joint - direction * self.link_size self.joints[i] = prev_joint - direction * self.link_size
if np.isclose(self.joints[0], self.target, atol=.05).all(): if np.isclose(self.joints[0], self.target, atol=.05).all():
# self.ready = True # self.ready = True
# TODO: smooth transition instead of cutting off
self.joints.pop(0) self.joints.pop(0)
self.scenario.add_anomaly_length(self.link_size)
if len(self.joints) == 0: if len(self.joints) == 0:
self.ready = True self.ready = True
self._drawn_points = self.joints self._drawn_points = self.joints
# void resolve(PVector pos) {
# angles.set(0, PVector.sub(pos, joints.get(0)).heading());
# joints.set(0, pos);
# for (int i = 1; i < joints.size(); i++) {
# float curAngle = PVector.sub(joints.get(i - 1), joints.get(i)).heading();
# angles.set(i, constrainAngle(curAngle, angles.get(i - 1), angleConstraint));
# joints.set(i, PVector.sub(joints.get(i - 1), PVector.fromAngle(angles.get(i)).setMag(linkSize)));
# }
# }
class DiffSegment(): class DiffSegment():
@ -194,12 +204,12 @@ class DiffSegment():
# run each render tick # run each render tick
def update_drawn_positions(self, dt: DeltaT): def update_drawn_positions(self, dt: DeltaT, scenario: DrawnScenario):
if isinstance(self.line, AppendableLine): if isinstance(self.line, AppendableLine):
if self.finished and self.line.ready: if self.finished and self.line.ready:
# convert when fully drawn # convert when fully drawn
# print(self, "CONVERT LINE") # print(self, "CONVERT LINE")
self.line = ProceduralChain.from_appendable_line(self.line) self.line = ProceduralChain.from_appendable_line(self.line, scenario)
if isinstance(self.line, ProceduralChain): if isinstance(self.line, ProceduralChain):
self.line.target = self._target_track.projected_history[-1] self.line.target = self._target_track.projected_history[-1]
@ -444,6 +454,10 @@ class DrawnScenario(TrackScenario):
Scenario contains the controls (scene, target positions) Scenario contains the controls (scene, target positions)
DrawnScenario class does the actual drawing of points incl. transitions DrawnScenario class does the actual drawing of points incl. transitions
""" """
ANOMALY_DECAY = .2
DISTANCE_ANOMALY_FACTOR = .05
def __init__(self): def __init__(self):
# self.created_at = time.time() # self.created_at = time.time()
# self.track_id = track_id # self.track_id = track_id
@ -456,9 +470,24 @@ class DrawnScenario(TrackScenario):
self.drawn_text = "" self.drawn_text = ""
self.drawn_text_lines: List[RenderableLine] = [] self.drawn_text_lines: List[RenderableLine] = []
self.anomly_score = 0 # TODO: variable self.anomaly_score = 0 # TODO: variable
self._drawn_anomaly_score = 0
super().__init__() super().__init__()
def add_anomaly_length(self, length: float):
"""
append a difference in meters between point
"""
self.anomaly_score += length * self.DISTANCE_ANOMALY_FACTOR
if self.anomaly_score > 1:
self.anomaly_score = 1.
def decay_anomaly_score(self, dt: DeltaT):
if self.anomaly_score == 0:
return
self.anomaly_score = exponentialDecay(self.anomaly_score, 0, self.ANOMALY_DECAY, dt)
def update_drawn_positions(self) -> List: def update_drawn_positions(self) -> List:
''' '''
use dt to lerp the drawn positions in the direction of current prediction use dt to lerp the drawn positions in the direction of current prediction
@ -476,8 +505,11 @@ class DrawnScenario(TrackScenario):
dt: DeltaT = t - self.last_update_t dt: DeltaT = t - self.last_update_t
self.last_update_t = t self.last_update_t = t
# 0. Update anomaly, slowly decreasing it over time
self.decay_anomaly_score(dt)
for diff in self.prediction_diffs: for diff in self.prediction_diffs:
diff.update_drawn_positions(dt) diff.update_drawn_positions(dt, self)
# 1. track history, direct update # 1. track history, direct update
MAX_HISTORY = 80 MAX_HISTORY = 80
@ -545,7 +577,7 @@ class DrawnScenario(TrackScenario):
# Animate line as procedural chain https://www.youtube.com/watch?v=qlfh_rv6khY&t=183s # Animate line as procedural chain https://www.youtube.com/watch?v=qlfh_rv6khY&t=183s
self._drawn_anomaly_score = exponentialDecay(self._drawn_anomaly_score, self.anomaly_score, 3, dt)
# print(self.drawn_predictions) # print(self.drawn_predictions)
@ -599,12 +631,13 @@ class DrawnScenario(TrackScenario):
points = [RenderablePoint(pos, color.as_faded(a)) for pos, a in zip(drawable_points, alphas)] points = [RenderablePoint(pos, color.as_faded(a)) for pos, a in zip(drawable_points, alphas)]
lines.append(RenderableLine(points)) lines.append(RenderableLine(points))
# 2. Position Marker # 2. Position Marker / anomaly score
anomaly_marker_color = SrgbaColor(0.,0.,1, 1.-self.lost_factor()) # fadeout anomaly_marker_color = SrgbaColor(0.,0.,1, 1.-self.lost_factor()) # fadeout
# lines.append(circle_arc(self.drawn_positions[-1][0], self.drawn_positions[-1][1], 1, t, self.anomly_score, anomaly_marker_color)) # lines.append(circle_arc(self.drawn_positions[-1][0], self.drawn_positions[-1][1], 1, t, self.anomaly_score, anomaly_marker_color))
lines.append(circle_arc( lines.append(circle_arc(
self.drawn_positions[-1][0], self.drawn_positions[-1][1], self.drawn_positions[-1][0], self.drawn_positions[-1][1],
max(.1, self.anomly_score * 2), max(.1, self._drawn_anomaly_score * 1.),
0, 1, 0, 1,
anomaly_marker_color) anomaly_marker_color)
) )