Render lines with trap_rust for better intepolation algorithms

This commit is contained in:
Ruben van de Ven 2025-04-11 21:28:11 +02:00
parent fe9efc163f
commit 3de2346ee9
11 changed files with 491 additions and 177 deletions

View file

@ -11,6 +11,7 @@ import time
from typing import Iterable, Optional
import cv2
from dataclasses import dataclass, field
import dataclasses
import numpy as np
from deep_sort_realtime.deep_sort.track import Track as DeepsortTrack
@ -208,7 +209,7 @@ class Track:
def __post_init__(self):
if not self.created_at:
self.created_at = time.perf_counter()
self.created_at = time.time()
def get_projected_history(self, H: Optional[cv2.Mat] = None, camera: Optional[Camera]= None) -> np.array:
foot_coordinates = [d.get_foot_coords() for d in self.history]
@ -438,6 +439,32 @@ class Frame:
return Frame(self.index, None, self.time, self.tracks, self.H, self.camera, self.maps)
class DataclassJSONEncoder(json.JSONEncoder):
def default(self, o):
if isinstance(o, np.ndarray):
return o.tolist()
if dataclasses.is_dataclass(o):
if isinstance(o, Frame):
tracks = {}
for track_id, track in o.tracks.items():
track_obj = dataclasses.asdict(track)
track_obj['history'] = track.get_projected_history(None, o.camera)
tracks[track_id] = track_obj
d = {
'index': o.index,
'time': o.time,
'tracks': tracks,
'camera': dataclasses.asdict(o.camera),
}
else:
d = dataclasses.asdict(o)
# if isinstance(o, Frame):
# # Don't send images over JSON
# del d['img']
return d
return super().default(o)
def video_src_from_config(config) -> Iterable[UrlOrPath]:
"""deprecated, now in video_source"""
if config.video_loop:

View file

@ -203,6 +203,10 @@ connection_parser.add_argument('--zmq-face-addr',
help='Manually specity communication addr for the face detector messages',
type=str,
default="ipc:///tmp/feeds_faces")
connection_parser.add_argument('--zmq-stage-addr',
help='Manually specity communication addr for the stage messages (the rendered lines)',
type=str,
default="tcp://0.0.0.0:99174")
connection_parser.add_argument('--zmq-camera-stream-addr',
help='Manually specity communication addr for the camera stream messages',

View file

@ -34,32 +34,6 @@ from trap.video_sources import get_video_source
logger = logging.getLogger('trap.frame_emitter')
class DataclassJSONEncoder(json.JSONEncoder):
def default(self, o):
if isinstance(o, np.ndarray):
return o.tolist()
if dataclasses.is_dataclass(o):
if isinstance(o, Frame):
tracks = {}
for track_id, track in o.tracks.items():
track_obj = dataclasses.asdict(track)
track_obj['history'] = track.get_projected_history(None, o.camera)
tracks[track_id] = track_obj
d = {
'index': o.index,
'time': o.time,
'tracks': tracks,
'camera': dataclasses.asdict(o.camera),
}
else:
d = dataclasses.asdict(o)
# if isinstance(o, Frame):
# # Don't send images over JSON
# del d['img']
return d
return super().default(o)

View file

@ -24,7 +24,7 @@ from typing import Dict, Iterable, Optional
from pyglet import shapes
from PIL import Image
from trap.scenarios import TrackScenario
# from trap.scenarios import TrackScenario
from trap.counter import CounterSender
from trap.frame_emitter import DetectionState, Frame, Track, Camera
# from trap.helios import HeliosDAC, HeliosPoint
@ -230,7 +230,7 @@ class LaserRenderer:
self.prediction_frame: Frame|None = None
self.tracks: Dict[str, Track] = {}
self.scenarios: Dict[str, TrackScenario] = {}
# self.scenarios: Dict[str, TrackScenario] = {}
self.predictions: Dict[str, Track] = {}
self.drawn_tracks: Dict[str, DrawnTrack] = {}
@ -345,8 +345,8 @@ class LaserRenderer:
# self.drawn_tracks[track_id].pred_track
self.drawn_tracks[track_id].set_predictions(track)
if track_id in self.scenarios:
self.scenarios[track_id].set_prediction(track)
# if track_id in self.scenarios:
# self.scenarios[track_id].set_prediction(track)
# self.drawn_predictions[track_id] = track
except zmq.ZMQError as e:
@ -357,10 +357,10 @@ class LaserRenderer:
for track_id, track in tracker_frame.tracks.items():
self.tracks[track_id] = track
if not track_id in self.scenarios:
self.scenarios[track_id] = TrackScenario(track)
else:
self.scenarios[track_id].set_track(track)
# if not track_id in self.scenarios:
# self.scenarios[track_id] = TrackScenario(track)
# else:
# self.scenarios[track_id].set_track(track)
# self.scenarios[track_id].receive_track(track)
except zmq.ZMQError as e:
logger.debug(f'reuse tracks')

53
trap/node.py Normal file
View file

@ -0,0 +1,53 @@
import logging
from multiprocessing.synchronize import Event as BaseEvent
from argparse import Namespace
from typing import Optional
import zmq
from trap.timer import Timer
class Node():
def __init__(self, config: Namespace, is_running: BaseEvent, timer_counter: Timer):
self.config = config
self.is_running = is_running
self.timer_counter = timer_counter
self.zmq_context = zmq.Context()
self.logger = self._logger()
self.setup()
@classmethod
def _logger(cls):
return logging.getLogger(f"trap.{cls.__name__}")
def tick(self):
with self.timer_counter.get_lock():
self.timer_counter.value+=1
def setup(self):
raise RuntimeError("Not implemented setup()")
def run(self):
raise RuntimeError("Not implemented run()")
def sub(self, addr: str):
"Default zmq sub configuration"
sock = self.zmq_context.socket(zmq.SUB)
sock.setsockopt(zmq.CONFLATE, 1) # only keep latest frame. NB. make sure this comes BEFORE connect, otherwise it's ignored!!
sock.setsockopt(zmq.SUBSCRIBE, b'')
sock.connect(addr)
return sock
def pub(self, addr: str):
"Default zmq pub configuration"
sock = self.zmq_context.socket(zmq.PUB)
sock.setsockopt(zmq.CONFLATE, 1) # only keep latest frame
sock.bind(addr)
return sock
@classmethod
def start(cls, config: Namespace, is_running: BaseEvent, timer_counter: Optional[Timer]):
instance = cls(config, is_running, timer_counter)
instance.run()

View file

@ -16,6 +16,7 @@ from trap.prediction_server import run_prediction_server
from trap.preview_renderer import run_preview_renderer
from trap.animation_renderer import run_animation_renderer
from trap.socket_forwarder import run_ws_forwarder
from trap.stage import Stage
from trap.timer import TimerCollection
from trap.tracker import run_tracker
@ -91,6 +92,7 @@ def start():
timer_fe = timers.new('frame_emitter')
timer_tracker = timers.new('tracker')
timer_faces = timers.new('faces')
timer_stage = timers.new('stage')
# instantiating process with arguments
procs = [
@ -98,6 +100,7 @@ def start():
ExceptionHandlingProcess(target=run_frame_emitter, kwargs={'config': args, 'is_running': isRunning, 'timer_counter': timer_fe.iterations}, name='frame_emitter'),
ExceptionHandlingProcess(target=run_tracker, kwargs={'config': args, 'is_running': isRunning, 'timer_counter': timer_tracker.iterations}, name='tracker'),
ExceptionHandlingProcess(target=run_detector, kwargs={'config': args, 'is_running': isRunning, 'timer_counter': timer_faces.iterations}, name='detector'),
ExceptionHandlingProcess(target=Stage.start, kwargs={'config': args, 'is_running': isRunning, 'timer_counter': timer_stage.iterations}, name='stage'),
]
# if args.render_file or args.render_url or args.render_window:

View file

@ -24,7 +24,7 @@ from typing import List, Optional
from pyglet import shapes
from PIL import Image
from trap.utils import convert_world_points_to_img_points
from trap.utils import convert_world_points_to_img_points, exponentialDecay, relativePointToPolar, relativePolarToPoint
from trap.frame_emitter import DetectionState, Frame, Track, Camera
@ -45,18 +45,6 @@ class FrameAnimation:
def done(self):
return (time.time() - self.start_time) > 5
def exponentialDecay(a, b, decay, dt):
"""Exponential decay as alternative to Lerp
Introduced by Freya Holmér: https://www.youtube.com/watch?v=LSNQuFEDOyQ
"""
return b + (a-b) * math.exp(-decay * dt)
def relativePointToPolar(origin, point) -> tuple[float, float]:
x, y = point[0] - origin[0], point[1] - origin[1]
return np.sqrt(x**2 + y**2), np.arctan2(y, x)
def relativePolarToPoint(origin, r, angle) -> tuple[float, float]:
return r * np.cos(angle) + origin[0], r * np.sin(angle) + origin[1]
PROJECTION_IMG = 0
PROJECTION_UNDISTORT = 1

View file

@ -1,125 +0,0 @@
from enum import Enum
import time
from typing import Optional
from statemachine import Event, State, StateMachine
from statemachine.exceptions import TransitionNotAllowed
from trap.base import Track
class ScenarioScene(Enum):
DETECTED = 1
FIRST_PREDICTION = 2
CORRECTED_PREDICTION = 3
LOITERING = 4
PLAY = 4
LOST = -1
class TrackScenario(StateMachine):
detected = State(initial=True)
substantial = State()
first_prediction = State()
corrected_prediction = State()
loitering = State()
play = State()
lost = State(final=True)
receive_track = lost.from_(
detected, first_prediction, corrected_prediction, loitering, play, substantial, cond="track_is_lost"
) | corrected_prediction.to(loitering, cond="track_is_loitering") | detected.to(substantial, cond="track_is_long")
receive_prediction = detected.to(first_prediction) | first_prediction.to(corrected_prediction, cond="prediction_is_stale") | corrected_prediction.to(play, cond="prediction_is_playing")
def __init__(self, track: Track):
self._track = track
self.first_prediction_track: Optional[Track] = None
self.prediction_track: Optional[Track] = None
super().__init__()
def track_is_long(self, track: Track):
return len(track.history) > 20
def track_is_lost(self, track: Track):
return track.lost
def track_is_loitering(self, track: Track):
# TODO)) Change to measure displacement over the last n seconds
return len(track.history) > (track.fps * 60) # seconds after which someone is loitering
def prediction_is_stale(self, track: Track):
# TODO use displacement instead of time
return bool(self.prediction_track and self.prediction_track.created_at < (time.perf_counter() - 2))
def prediction_is_playing(self, Track):
return False
# @property
# def track(self):
# return self._track
def set_track(self, track: Track):
self._track = track
try:
self.receive_track(track)
except TransitionNotAllowed as e:
# state change is optional
pass
def set_prediction(self, track: Track):
if not self.first_prediction_track:
self.first_prediction_track = track
self.prediction_track = track
try:
self.receive_prediction(track)
except TransitionNotAllowed as e:
# state change is optional
pass
def after_receive_track(self, track: Track):
print('change state')
def on_receive_track(self, track: Track):
# on event, because it happens for every receive, despite transition
print('updating track!')
# self.track = track
def on_receive_prediction(self, track: Track):
# on event, because it happens for every receive, despite transition
print('updating prediction!')
# self.track = track
def after_receive_prediction(self, track: Track):
# after
self.prediction_track = track
if not self.first_prediction_track:
self.first_prediction_track = track
def on_enter_corrected_prediction(self):
print('corrected!')
def on_enter_detected(self):
print("DETECTED!")
def on_enter_first_prediction(self):
print("Hello!")
def on_enter_detected(self):
print(f"enter {self.current_state.id}")
def on_enter_substantial(self):
print(f"enter {self.current_state.id}")
def on_enter_first_prediction(self):
print(f"enter {self.current_state.id}")
def on_enter_corrected_prediction(self):
print(f"enter {self.current_state.id}")
def on_enter_loitering(self):
print(f"enter {self.current_state.id}")
def on_enter_play(self):
print(f"enter {self.current_state.id}")
def on_enter_lost(self):
print(f"enter {self.current_state.id}")

376
trap/stage.py Normal file
View file

@ -0,0 +1,376 @@
from __future__ import annotations
from collections import defaultdict
from dataclasses import dataclass
from enum import Enum
import json
import logging
import time
from typing import Dict, List, Optional, Tuple
from statemachine import Event, State, StateMachine
from statemachine.exceptions import TransitionNotAllowed
import zmq
from sgan.sgan import data
from trap.base import DataclassJSONEncoder, Frame, Track
from trap.counter import CounterSender
from trap.node import Node
from trap.timer import Timer
from trap.utils import exponentialDecay, relativePointToPolar, relativePolarToPoint
class ScenarioScene(Enum):
DETECTED = 1
FIRST_PREDICTION = 2
CORRECTED_PREDICTION = 3
LOITERING = 4
PLAY = 4
LOST = -1
LOST_FADEOUT = 3
class TrackScenario(StateMachine):
detected = State(initial=True)
substantial = State()
first_prediction = State()
corrected_prediction = State()
loitering = State()
play = State()
lost = State(final=True)
receive_track = lost.from_(
detected, first_prediction, corrected_prediction, loitering, play, substantial, cond="track_is_lost"
) | corrected_prediction.to(loitering, cond="track_is_loitering") | detected.to(substantial, cond="track_is_long")
mark_lost = lost.from_(detected, substantial, first_prediction, corrected_prediction, loitering, play)
receive_prediction = detected.to(first_prediction) | first_prediction.to(corrected_prediction, cond="prediction_is_stale") | corrected_prediction.to(play, cond="prediction_is_playing")
def __init__(self):
self._track = None
self.first_prediction_track: Optional[Track] = None
self.prediction_track: Optional[Track] = None
super().__init__()
def track_is_long(self, track: Track):
return len(track.history) > 20
def track_is_lost(self, track: Track):
# return self._track and self._track.created_at < time.time() - 5
return track.lost # Note, for now this is not implemented in the tacker, see check_lost()
def track_is_loitering(self, track: Track):
# TODO)) Change to measure displacement over the last n seconds
return len(track.history) > (track.fps * 60) # seconds after which someone is loitering
def prediction_is_stale(self, track: Track):
# TODO use displacement instead of time
return bool(self.prediction_track and self.prediction_track.created_at < (time.perf_counter() - 2))
def prediction_is_playing(self, Track):
return False
def check_lost(self):
if self.current_state is not self.lost and self._track and self._track.created_at < time.time() - 5:
self.mark_lost()
def set_track(self, track: Track):
if self._track and self._track.created_at > track.created_at:
# ignore old track
return
self._track = track
try:
self.receive_track(track)
except TransitionNotAllowed as e:
# state change is optional
pass
def set_prediction(self, track: Track):
if not self._track:
# in case of the unlikely event that prediction was passed sooner
self.set_track(track)
if not self.first_prediction_track:
self.first_prediction_track = track
if self.prediction_track and (track.created_at - self.prediction_track.created_at) < .7:
# just drop tracks if the predictions come to quick
return
self.prediction_track = track
try:
self.receive_prediction(track)
except TransitionNotAllowed as e:
# state change is optional
pass
def after_receive_track(self, track: Track):
print('change state')
def on_receive_track(self, track: Track):
# on event, because it happens for every receive, despite transition
print('updating track!')
# self.track = track
def on_receive_prediction(self, track: Track):
# on event, because it happens for every receive, despite transition
print('updating prediction!')
# self.track = track
def after_receive_prediction(self, track: Track):
# after
self.prediction_track = track
if not self.first_prediction_track:
self.first_prediction_track = track
def on_enter_corrected_prediction(self):
print('corrected!')
def on_enter_detected(self):
print("DETECTED!")
def on_enter_first_prediction(self):
print("Hello!")
def on_enter_detected(self):
print(f"enter {self.current_state.id}")
def on_enter_substantial(self):
print(f"enter {self.current_state.id}")
def on_enter_first_prediction(self):
print(f"enter {self.current_state.id}")
def on_enter_corrected_prediction(self):
print(f"enter {self.current_state.id}")
def on_enter_loitering(self):
print(f"enter {self.current_state.id}")
def on_enter_play(self):
print(f"enter {self.current_state.id}")
def on_enter_lost(self):
print(f"enter {self.current_state.id}")
self.lost_at = time.time()
def lost_for(self):
if self.current_state is self.lost:
return time.time() - self.lost_at
return None
def lost_factor(self):
l = self.lost_for()
if not l:
return 0
return l/LOST_FADEOUT
def to_lines(self) -> List[RenderableLine]:
raise RuntimeError("Not implemented yet")
class DrawnScenario(TrackScenario):
"""
Scenario contains the controls (scene, target positions)
DrawnScenario class does the actual drawing of points incl. transitions
"""
def __init__(self):
# self.created_at = time.time()
# self.track_id = track_id
self.last_update_t = time.perf_counter()
self.drawn_positions: List[Tuple[float,float]] = []
self.drawn_pred_history: List[Tuple[float,float]] = []
self.drawn_predictions: List[List[Tuple[float,float]]] = []
super().__init__()
def update_drawn_positions(self) -> List:
'''
use dt to lerp the drawn positions in the direction of current prediction
'''
# TODO: make lerp, currently quick way to get results
def int_or_not(v):
"""quick wrapper to toggle int'ing"""
return v
# return int(v)
# 0. calculate dt
# if dt is None:
t = time.perf_counter()
dt = t - self.last_update_t
self.last_update_t = t
# 1. track history, direct update
positions = self._track.get_projected_history(None, self.camera)
# TODO)) Limit history to N points, or N lenght
for i, pos in enumerate(self.drawn_positions):
self.drawn_positions[i][0] = positions[i][0]
self.drawn_positions[i][1] = positions[i][1]
if len(positions) > len(self.drawn_positions):
self.drawn_positions.extend(positions[len(self.drawn_positions):])
if self.prediction_track and self.prediction_track.predictor_history:
# 2. history as seen by predictor (Trajectron)
for i, pos in enumerate(self.drawn_pred_history):
if len(self.prediction_track.predictor_history) > i:
self.drawn_pred_history[i][0] = int_or_not(exponentialDecay(self.drawn_pred_history[i][0], self.prediction_track.predictor_history[i][0], 16, dt))
self.drawn_pred_history[i][1] = int_or_not(exponentialDecay(self.drawn_pred_history[i][1], self.prediction_track.predictor_history[i][1], 16, dt))
if len(self.prediction_track.predictor_history) > len(self.drawn_pred_history):
self.drawn_pred_history.extend(positions[len(self.drawn_pred_history):])
if self.prediction_track and self.prediction_track.predictions:
# 3. predictions
if len(self.prediction_track.predictions):
for a, drawn_prediction in enumerate(self.drawn_predictions):
for i, pos in enumerate(drawn_prediction):
# TODO: this should be done in polar space starting from origin (i.e. self.drawn_posision[-1])
decay = max(3, (18/i) if i else 10) # points further away move with more delay
decay = 16
origin = self.drawn_positions[-1]
drawn_r, drawn_angle = relativePointToPolar( origin, drawn_prediction[i])
pred_r, pred_angle = relativePointToPolar(origin, self.prediction_track.predictions[a][i])
r = exponentialDecay(drawn_r, pred_r, decay, dt)
angle = exponentialDecay(drawn_angle, pred_angle, decay, dt)
x, y = relativePolarToPoint(origin, r, angle)
self.drawn_predictions[a][i] = int_or_not(x), int_or_not(y)
# self.drawn_predictions[i][0] = int(exponentialDecay(self.drawn_predictions[i][0], self.prediction_track.predictions[i][0], decay, dt))
# self.drawn_predictions[i][1] = int(exponentialDecay(self.drawn_predictions[i][1], self.prediction_track.predictions[i][1], decay, dt))
if len(self.prediction_track.predictions) > len(self.drawn_predictions):
self.drawn_predictions.extend(self.prediction_track.predictions[len(self.drawn_predictions):])
# for a, drawn_prediction in self.drawn_predictions:
# if len(self.pred_coords) > len(self.drawn_predictions):
# self.drawn_predictions.extend(self.pred_coords[len(self.drawn_predictions):])
def to_renderable_lines(self) -> RenderableLines:
lines = []
color = SrgbaColor(1.,0.,0.,1.-self.lost_factor())
# positions = [RenderablePosition.from_list(pos) for pos in self.drawn_positions]
points = [RenderablePoint(pos, color) for pos in self.drawn_positions]
lines.append(RenderableLine(points))
if len(self.drawn_predictions):
color = SrgbaColor(0.,0.5,0.,1.-self.lost_factor())
# positions = [RenderablePosition.from_list(pos) for pos in self.drawn_positions]
points = [RenderablePoint(pos, color) for pos in self.drawn_predictions[0]]
lines.append(RenderableLine(points))
return lines
# drawn_pred_history
# drawn_predictions
# @dataclass
# class RenderablePosition():
# x: float
# y: float
# @classmethod
# def from_list(cls, l: List[float, float]) -> RenderablePosition:
# return cls(x = float(l[0]), y=float(l[1]))
RenderablePosition = Tuple[float,float]
@dataclass
class SrgbaColor():
red: float
green: float
blue: float
alpha: float
@dataclass
class RenderablePoint():
position: RenderablePosition
color: SrgbaColor
@classmethod
def from_list(cls, l: List[float, float], color: SrgbaColor) -> RenderablePoint:
return cls([float(l[0]), float(l[1])], color)
@dataclass
class RenderableLine():
points: List[RenderablePoint]
@dataclass
class RenderableLines():
lines: List[RenderableLine]
class Stage(Node):
"""
Render a stage, on which different TrackScenarios take place to a
single image of lines. Which can be passed to different renderers
E.g. the laser or image renderers.
"""
FPS = 60
def setup(self):
# self.scenarios: List[DrawnScenario] = []
self.scenarios: Dict[str, DrawnScenario] = defaultdict(lambda: DrawnScenario())
self.trajectory_sock = self.sub(self.config.zmq_trajectory_addr)
self.prediction_sock = self.sub(self.config.zmq_prediction_addr)
self.stage_sock = self.pub(self.config.zmq_stage_addr)
self.counter = CounterSender()
def run(self):
prev_time = time.perf_counter()
while self.is_running.is_set():
self.tick()
# 1) poll & update
self.loop_receive()
# 2) render
self.loop_render()
# 3) calculate latency for desired FPS
now = time.perf_counter()
time_diff = (now - prev_time)
if time_diff < 1/self.FPS:
# print(f"sleep {1/self.FPS - time_diff}")
time.sleep(1/self.FPS - time_diff)
now += 1/self.FPS - time_diff
prev_time = now
def loop_receive(self):
# 1) receive predictions
try:
prediction_frame: Frame = self.prediction_sock.recv_pyobj(zmq.NOBLOCK)
for track_id, track in prediction_frame.tracks.items():
self.scenarios[track_id].set_prediction(track)
except zmq.ZMQError as e:
self.logger.debug(f'reuse prediction')
# 2) receive tracker tracks
try:
trajectory_frame: Frame = self.trajectory_sock.recv_pyobj(zmq.NOBLOCK)
for track_id, track in trajectory_frame.tracks.items():
self.scenarios[track_id].set_track(track)
self.scenarios[track_id].camera = trajectory_frame.camera # little hack to pass camera!
except zmq.ZMQError as e:
self.logger.debug(f'reuse tracks')
# 3) Remove stale tracks
for track_id, scenario in list(self.scenarios.items()):
# check when last tracker update was received
scenario.check_lost()
if scenario.lost_factor() > 1:
self.logger.info(f"rm track {track_id}")
del self.scenarios[track_id]
def loop_render(self):
lines: RenderableLine = []
for track_id, scenario in self.scenarios.items():
scenario.update_drawn_positions()
lines.extend(scenario.to_renderable_lines())
# print(lines)
rl = RenderableLines(lines)
self.counter.set("stage.lines", len(lines))
self.stage_sock.send_json(rl, cls=DataclassJSONEncoder)

View file

@ -1,13 +1,13 @@
import collections
from re import A
import time
from multiprocessing.sharedctypes import RawValue, Value, Array
from ctypes import c_double
from multiprocessing.sharedctypes import Value
from typing import MutableSequence
class Timer():
"""
Multiprocess timer. Count iterations in one process, while converting that
to fps in the other.
Measure 2 independent things: the freuency of tic, and the duration of tic->toc
Note that indeed these don't need to be equal
"""
@ -40,7 +40,6 @@ class Timer():
@property
def fps(self):
fpses = []
if len(self.tocs) < 2:
return 0
dt = self.tocs[-1][0] - self.tocs[0][0]

View file

@ -1,5 +1,6 @@
# lerp & inverse lerp from https://gist.github.com/laundmo/b224b1f4c8ef6ca5fe47e132c8deab56
import linecache
import math
import os
from pathlib import Path
import tracemalloc
@ -27,6 +28,20 @@ def inv_lerp(a: float, b: float, v: float) -> float:
"""
return (v - a) / (b - a)
def exponentialDecay(a, b, decay, dt):
"""Exponential decay as alternative to Lerp
Introduced by Freya Holmér: https://www.youtube.com/watch?v=LSNQuFEDOyQ
"""
return b + (a-b) * math.exp(-decay * dt)
def relativePointToPolar(origin, point) -> tuple[float, float]:
x, y = point[0] - origin[0], point[1] - origin[1]
return np.sqrt(x**2 + y**2), np.arctan2(y, x)
def relativePolarToPoint(origin, r, angle) -> tuple[float, float]:
return r * np.cos(angle) + origin[0], r * np.sin(angle) + origin[1]
# def line_intersection(line1, line2):
# xdiff = (line1[0][0] - line1[1][0], line2[0][0] - line2[1][0])
# ydiff = (line1[0][1] - line1[1][1], line2[0][1] - line2[1][1])