From 55d86eab45752b23221d5fdbe56781cacffbec1d Mon Sep 17 00:00:00 2001 From: Ruben van de Ven Date: Fri, 16 May 2025 13:43:45 +0200 Subject: [PATCH] accumulate anomaly score --- trap/stage.py | 83 +++++++++++++++++++++++++++++++++++---------------- 1 file changed, 58 insertions(+), 25 deletions(-) diff --git a/trap/stage.py b/trap/stage.py index 306f869..7cfbe2a 100644 --- a/trap/stage.py +++ b/trap/stage.py @@ -89,17 +89,28 @@ class AppendableLine(LineGenerator): class ProceduralChain(LineGenerator): MOVE_DECAY_SPEED = 50 + VELOCITY_DAMPING = 10 + VELOCITY_FACTOR = 2 link_size = .1 # 10cm # angle_constraint = 5 - def __init__(self, joints: List[Coordinate]): + def __init__(self, joints: List[Coordinate], scenario: DrawnScenario, use_velocity = False): self.joints: List[Coordinate] = joints self.target: Coordinate = joints[-1] self.ready = False self.move_decay_speed = self.MOVE_DECAY_SPEED + self.scenario = scenario + + self.use_velocity = use_velocity + if self.use_velocity: + if len(self.joints) > 1: + self.v = np.array(self.joints[-2]) - np.array(self.joints[-1]) + self.v /= np.linalg.norm(self.v) / 10 + else: + self.v = np.array([0,0]) @classmethod - def from_appendable_line(cls, al: AppendableLine) -> ProceduralChain: + def from_appendable_line(cls, al: AppendableLine, scenario: DrawnScenario) -> ProceduralChain: # TODO: create more segments: # last added points becomes the head of the chain points = list(reversed(al.points)) @@ -107,7 +118,7 @@ class ProceduralChain(LineGenerator): linestring = linestring.segmentize(cls.link_size) joints = list(linestring.coords) - return cls(joints) + return cls(joints, scenario) def update_drawn_positions(self, dt: DeltaT): if self.ready: @@ -115,33 +126,32 @@ class ProceduralChain(LineGenerator): # direction = np.array(self.joints[-1] - self.target) # TODO: check self.joints empty, and stop then + if self.use_velocity: + vx = exponentialDecayRounded(self.v[0], self.target[0] - self.joints[0][0], self.VELOCITY_DAMPING, dt, .05) + vy = exponentialDecayRounded(self.v[1], self.target[1] - self.joints[0][1], self.VELOCITY_DAMPING, dt, .05) + self.v = np.array([vx, vy]) + self.joints[0] = (float(self.joints[0][0] + self.v[0] * dt * self.VELOCITY_FACTOR), float(self.joints[0][1] + self.v[1] * dt * self.VELOCITY_FACTOR)) + else: + x = exponentialDecayRounded(self.joints[0][0], self.target[0], self.move_decay_speed, dt, .05) + y = exponentialDecayRounded(self.joints[0][1], self.target[1], self.move_decay_speed, dt, .05) + self.joints[0] = (float(x), float(y)) - x = exponentialDecayRounded(self.joints[0][0], self.target[0], self.move_decay_speed, dt, .05) - y = exponentialDecayRounded(self.joints[0][1], self.target[1], self.move_decay_speed, dt, .05) - self.joints[0] = (float(x), float(y)) - + # Loop inspired by: https://github.com/argonautcode/animal-proc-anim/blob/main/Chain.pde + # see that code for angle constrains. for i, (joint, prev_joint) in enumerate(zip(self.joints[1:], self.joints), start=1): - diff = np.array(prev_joint) - np.array(joint) direction = diff / np.linalg.norm(diff) self.joints[i] = prev_joint - direction * self.link_size if np.isclose(self.joints[0], self.target, atol=.05).all(): # self.ready = True + # TODO: smooth transition instead of cutting off self.joints.pop(0) + self.scenario.add_anomaly_length(self.link_size) if len(self.joints) == 0: self.ready = True self._drawn_points = self.joints - # void resolve(PVector pos) { - # angles.set(0, PVector.sub(pos, joints.get(0)).heading()); - # joints.set(0, pos); - # for (int i = 1; i < joints.size(); i++) { - # float curAngle = PVector.sub(joints.get(i - 1), joints.get(i)).heading(); - # angles.set(i, constrainAngle(curAngle, angles.get(i - 1), angleConstraint)); - # joints.set(i, PVector.sub(joints.get(i - 1), PVector.fromAngle(angles.get(i)).setMag(linkSize))); - # } - # } class DiffSegment(): @@ -194,12 +204,12 @@ class DiffSegment(): # run each render tick - def update_drawn_positions(self, dt: DeltaT): + def update_drawn_positions(self, dt: DeltaT, scenario: DrawnScenario): if isinstance(self.line, AppendableLine): if self.finished and self.line.ready: # convert when fully drawn # print(self, "CONVERT LINE") - self.line = ProceduralChain.from_appendable_line(self.line) + self.line = ProceduralChain.from_appendable_line(self.line, scenario) if isinstance(self.line, ProceduralChain): self.line.target = self._target_track.projected_history[-1] @@ -444,6 +454,10 @@ class DrawnScenario(TrackScenario): Scenario contains the controls (scene, target positions) DrawnScenario class does the actual drawing of points incl. transitions """ + + ANOMALY_DECAY = .2 + DISTANCE_ANOMALY_FACTOR = .05 + def __init__(self): # self.created_at = time.time() # self.track_id = track_id @@ -456,8 +470,23 @@ class DrawnScenario(TrackScenario): self.drawn_text = "" self.drawn_text_lines: List[RenderableLine] = [] - self.anomly_score = 0 # TODO: variable + self.anomaly_score = 0 # TODO: variable + self._drawn_anomaly_score = 0 super().__init__() + + def add_anomaly_length(self, length: float): + """ + append a difference in meters between point + """ + self.anomaly_score += length * self.DISTANCE_ANOMALY_FACTOR + if self.anomaly_score > 1: + self.anomaly_score = 1. + + def decay_anomaly_score(self, dt: DeltaT): + if self.anomaly_score == 0: + return + + self.anomaly_score = exponentialDecay(self.anomaly_score, 0, self.ANOMALY_DECAY, dt) def update_drawn_positions(self) -> List: ''' @@ -476,8 +505,11 @@ class DrawnScenario(TrackScenario): dt: DeltaT = t - self.last_update_t self.last_update_t = t + # 0. Update anomaly, slowly decreasing it over time + self.decay_anomaly_score(dt) + for diff in self.prediction_diffs: - diff.update_drawn_positions(dt) + diff.update_drawn_positions(dt, self) # 1. track history, direct update MAX_HISTORY = 80 @@ -545,7 +577,7 @@ class DrawnScenario(TrackScenario): # Animate line as procedural chain https://www.youtube.com/watch?v=qlfh_rv6khY&t=183s - + self._drawn_anomaly_score = exponentialDecay(self._drawn_anomaly_score, self.anomaly_score, 3, dt) # print(self.drawn_predictions) @@ -599,12 +631,13 @@ class DrawnScenario(TrackScenario): points = [RenderablePoint(pos, color.as_faded(a)) for pos, a in zip(drawable_points, alphas)] lines.append(RenderableLine(points)) - # 2. Position Marker + # 2. Position Marker / anomaly score + anomaly_marker_color = SrgbaColor(0.,0.,1, 1.-self.lost_factor()) # fadeout - # lines.append(circle_arc(self.drawn_positions[-1][0], self.drawn_positions[-1][1], 1, t, self.anomly_score, anomaly_marker_color)) + # lines.append(circle_arc(self.drawn_positions[-1][0], self.drawn_positions[-1][1], 1, t, self.anomaly_score, anomaly_marker_color)) lines.append(circle_arc( self.drawn_positions[-1][0], self.drawn_positions[-1][1], - max(.1, self.anomly_score * 2), + max(.1, self._drawn_anomaly_score * 1.), 0, 1, anomaly_marker_color) )