diff --git a/trap/base.py b/trap/base.py index 8921cf6..1ec6e90 100644 --- a/trap/base.py +++ b/trap/base.py @@ -11,6 +11,7 @@ import time from typing import Iterable, Optional import cv2 from dataclasses import dataclass, field +import dataclasses import numpy as np from deep_sort_realtime.deep_sort.track import Track as DeepsortTrack @@ -208,7 +209,7 @@ class Track: def __post_init__(self): if not self.created_at: - self.created_at = time.perf_counter() + self.created_at = time.time() def get_projected_history(self, H: Optional[cv2.Mat] = None, camera: Optional[Camera]= None) -> np.array: foot_coordinates = [d.get_foot_coords() for d in self.history] @@ -438,6 +439,32 @@ class Frame: return Frame(self.index, None, self.time, self.tracks, self.H, self.camera, self.maps) +class DataclassJSONEncoder(json.JSONEncoder): + def default(self, o): + if isinstance(o, np.ndarray): + return o.tolist() + if dataclasses.is_dataclass(o): + if isinstance(o, Frame): + tracks = {} + for track_id, track in o.tracks.items(): + track_obj = dataclasses.asdict(track) + track_obj['history'] = track.get_projected_history(None, o.camera) + tracks[track_id] = track_obj + d = { + 'index': o.index, + 'time': o.time, + 'tracks': tracks, + 'camera': dataclasses.asdict(o.camera), + } + else: + d = dataclasses.asdict(o) + # if isinstance(o, Frame): + # # Don't send images over JSON + # del d['img'] + return d + return super().default(o) + + def video_src_from_config(config) -> Iterable[UrlOrPath]: """deprecated, now in video_source""" if config.video_loop: diff --git a/trap/config.py b/trap/config.py index bf6017e..d1fea28 100644 --- a/trap/config.py +++ b/trap/config.py @@ -203,6 +203,10 @@ connection_parser.add_argument('--zmq-face-addr', help='Manually specity communication addr for the face detector messages', type=str, default="ipc:///tmp/feeds_faces") +connection_parser.add_argument('--zmq-stage-addr', + help='Manually specity communication addr for the stage messages (the rendered lines)', + type=str, + default="tcp://0.0.0.0:99174") connection_parser.add_argument('--zmq-camera-stream-addr', help='Manually specity communication addr for the camera stream messages', diff --git a/trap/frame_emitter.py b/trap/frame_emitter.py index 0259d4f..4007b3d 100644 --- a/trap/frame_emitter.py +++ b/trap/frame_emitter.py @@ -34,32 +34,6 @@ from trap.video_sources import get_video_source logger = logging.getLogger('trap.frame_emitter') -class DataclassJSONEncoder(json.JSONEncoder): - def default(self, o): - if isinstance(o, np.ndarray): - return o.tolist() - if dataclasses.is_dataclass(o): - if isinstance(o, Frame): - tracks = {} - for track_id, track in o.tracks.items(): - track_obj = dataclasses.asdict(track) - track_obj['history'] = track.get_projected_history(None, o.camera) - tracks[track_id] = track_obj - d = { - 'index': o.index, - 'time': o.time, - 'tracks': tracks, - 'camera': dataclasses.asdict(o.camera), - } - else: - d = dataclasses.asdict(o) - # if isinstance(o, Frame): - # # Don't send images over JSON - # del d['img'] - return d - return super().default(o) - - diff --git a/trap/laser_renderer.py b/trap/laser_renderer.py index 6074bd0..5260155 100644 --- a/trap/laser_renderer.py +++ b/trap/laser_renderer.py @@ -24,7 +24,7 @@ from typing import Dict, Iterable, Optional from pyglet import shapes from PIL import Image -from trap.scenarios import TrackScenario +# from trap.scenarios import TrackScenario from trap.counter import CounterSender from trap.frame_emitter import DetectionState, Frame, Track, Camera # from trap.helios import HeliosDAC, HeliosPoint @@ -230,7 +230,7 @@ class LaserRenderer: self.prediction_frame: Frame|None = None self.tracks: Dict[str, Track] = {} - self.scenarios: Dict[str, TrackScenario] = {} + # self.scenarios: Dict[str, TrackScenario] = {} self.predictions: Dict[str, Track] = {} self.drawn_tracks: Dict[str, DrawnTrack] = {} @@ -345,8 +345,8 @@ class LaserRenderer: # self.drawn_tracks[track_id].pred_track self.drawn_tracks[track_id].set_predictions(track) - if track_id in self.scenarios: - self.scenarios[track_id].set_prediction(track) + # if track_id in self.scenarios: + # self.scenarios[track_id].set_prediction(track) # self.drawn_predictions[track_id] = track except zmq.ZMQError as e: @@ -357,10 +357,10 @@ class LaserRenderer: for track_id, track in tracker_frame.tracks.items(): self.tracks[track_id] = track - if not track_id in self.scenarios: - self.scenarios[track_id] = TrackScenario(track) - else: - self.scenarios[track_id].set_track(track) + # if not track_id in self.scenarios: + # self.scenarios[track_id] = TrackScenario(track) + # else: + # self.scenarios[track_id].set_track(track) # self.scenarios[track_id].receive_track(track) except zmq.ZMQError as e: logger.debug(f'reuse tracks') diff --git a/trap/node.py b/trap/node.py new file mode 100644 index 0000000..22a734c --- /dev/null +++ b/trap/node.py @@ -0,0 +1,53 @@ +import logging +from multiprocessing.synchronize import Event as BaseEvent +from argparse import Namespace +from typing import Optional + +import zmq + +from trap.timer import Timer + + +class Node(): + def __init__(self, config: Namespace, is_running: BaseEvent, timer_counter: Timer): + self.config = config + self.is_running = is_running + self.timer_counter = timer_counter + self.zmq_context = zmq.Context() + self.logger = self._logger() + + self.setup() + + @classmethod + def _logger(cls): + return logging.getLogger(f"trap.{cls.__name__}") + + def tick(self): + with self.timer_counter.get_lock(): + self.timer_counter.value+=1 + + def setup(self): + raise RuntimeError("Not implemented setup()") + + def run(self): + raise RuntimeError("Not implemented run()") + + def sub(self, addr: str): + "Default zmq sub configuration" + sock = self.zmq_context.socket(zmq.SUB) + sock.setsockopt(zmq.CONFLATE, 1) # only keep latest frame. NB. make sure this comes BEFORE connect, otherwise it's ignored!! + sock.setsockopt(zmq.SUBSCRIBE, b'') + sock.connect(addr) + return sock + + def pub(self, addr: str): + "Default zmq pub configuration" + sock = self.zmq_context.socket(zmq.PUB) + sock.setsockopt(zmq.CONFLATE, 1) # only keep latest frame + sock.bind(addr) + return sock + + @classmethod + def start(cls, config: Namespace, is_running: BaseEvent, timer_counter: Optional[Timer]): + instance = cls(config, is_running, timer_counter) + instance.run() \ No newline at end of file diff --git a/trap/plumber.py b/trap/plumber.py index ce9a2ba..d2db1ef 100644 --- a/trap/plumber.py +++ b/trap/plumber.py @@ -16,6 +16,7 @@ from trap.prediction_server import run_prediction_server from trap.preview_renderer import run_preview_renderer from trap.animation_renderer import run_animation_renderer from trap.socket_forwarder import run_ws_forwarder +from trap.stage import Stage from trap.timer import TimerCollection from trap.tracker import run_tracker @@ -91,6 +92,7 @@ def start(): timer_fe = timers.new('frame_emitter') timer_tracker = timers.new('tracker') timer_faces = timers.new('faces') + timer_stage = timers.new('stage') # instantiating process with arguments procs = [ @@ -98,6 +100,7 @@ def start(): ExceptionHandlingProcess(target=run_frame_emitter, kwargs={'config': args, 'is_running': isRunning, 'timer_counter': timer_fe.iterations}, name='frame_emitter'), ExceptionHandlingProcess(target=run_tracker, kwargs={'config': args, 'is_running': isRunning, 'timer_counter': timer_tracker.iterations}, name='tracker'), ExceptionHandlingProcess(target=run_detector, kwargs={'config': args, 'is_running': isRunning, 'timer_counter': timer_faces.iterations}, name='detector'), + ExceptionHandlingProcess(target=Stage.start, kwargs={'config': args, 'is_running': isRunning, 'timer_counter': timer_stage.iterations}, name='stage'), ] # if args.render_file or args.render_url or args.render_window: diff --git a/trap/preview_renderer.py b/trap/preview_renderer.py index 807c308..b6e252b 100644 --- a/trap/preview_renderer.py +++ b/trap/preview_renderer.py @@ -24,7 +24,7 @@ from typing import List, Optional from pyglet import shapes from PIL import Image -from trap.utils import convert_world_points_to_img_points +from trap.utils import convert_world_points_to_img_points, exponentialDecay, relativePointToPolar, relativePolarToPoint from trap.frame_emitter import DetectionState, Frame, Track, Camera @@ -45,18 +45,6 @@ class FrameAnimation: def done(self): return (time.time() - self.start_time) > 5 -def exponentialDecay(a, b, decay, dt): - """Exponential decay as alternative to Lerp - Introduced by Freya Holmér: https://www.youtube.com/watch?v=LSNQuFEDOyQ - """ - return b + (a-b) * math.exp(-decay * dt) - -def relativePointToPolar(origin, point) -> tuple[float, float]: - x, y = point[0] - origin[0], point[1] - origin[1] - return np.sqrt(x**2 + y**2), np.arctan2(y, x) - -def relativePolarToPoint(origin, r, angle) -> tuple[float, float]: - return r * np.cos(angle) + origin[0], r * np.sin(angle) + origin[1] PROJECTION_IMG = 0 PROJECTION_UNDISTORT = 1 diff --git a/trap/scenarios.py b/trap/scenarios.py deleted file mode 100644 index 4974f60..0000000 --- a/trap/scenarios.py +++ /dev/null @@ -1,125 +0,0 @@ - -from enum import Enum -import time -from typing import Optional -from statemachine import Event, State, StateMachine -from statemachine.exceptions import TransitionNotAllowed - -from trap.base import Track - - -class ScenarioScene(Enum): - DETECTED = 1 - FIRST_PREDICTION = 2 - CORRECTED_PREDICTION = 3 - LOITERING = 4 - PLAY = 4 - LOST = -1 - -class TrackScenario(StateMachine): - detected = State(initial=True) - substantial = State() - first_prediction = State() - corrected_prediction = State() - loitering = State() - play = State() - lost = State(final=True) - - receive_track = lost.from_( - detected, first_prediction, corrected_prediction, loitering, play, substantial, cond="track_is_lost" - ) | corrected_prediction.to(loitering, cond="track_is_loitering") | detected.to(substantial, cond="track_is_long") - - receive_prediction = detected.to(first_prediction) | first_prediction.to(corrected_prediction, cond="prediction_is_stale") | corrected_prediction.to(play, cond="prediction_is_playing") - - def __init__(self, track: Track): - self._track = track - self.first_prediction_track: Optional[Track] = None - self.prediction_track: Optional[Track] = None - super().__init__() - - def track_is_long(self, track: Track): - return len(track.history) > 20 - - def track_is_lost(self, track: Track): - return track.lost - - def track_is_loitering(self, track: Track): - # TODO)) Change to measure displacement over the last n seconds - return len(track.history) > (track.fps * 60) # seconds after which someone is loitering - - def prediction_is_stale(self, track: Track): - # TODO use displacement instead of time - return bool(self.prediction_track and self.prediction_track.created_at < (time.perf_counter() - 2)) - - def prediction_is_playing(self, Track): - return False - - # @property - # def track(self): - # return self._track - - def set_track(self, track: Track): - self._track = track - try: - self.receive_track(track) - except TransitionNotAllowed as e: - # state change is optional - pass - - def set_prediction(self, track: Track): - if not self.first_prediction_track: - self.first_prediction_track = track - - self.prediction_track = track - try: - self.receive_prediction(track) - except TransitionNotAllowed as e: - # state change is optional - pass - - def after_receive_track(self, track: Track): - print('change state') - - def on_receive_track(self, track: Track): - # on event, because it happens for every receive, despite transition - print('updating track!') - # self.track = track - - def on_receive_prediction(self, track: Track): - # on event, because it happens for every receive, despite transition - print('updating prediction!') - # self.track = track - - def after_receive_prediction(self, track: Track): - # after - self.prediction_track = track - if not self.first_prediction_track: - self.first_prediction_track = track - - def on_enter_corrected_prediction(self): - print('corrected!') - - def on_enter_detected(self): - print("DETECTED!") - - def on_enter_first_prediction(self): - print("Hello!") - - def on_enter_detected(self): - print(f"enter {self.current_state.id}") - def on_enter_substantial(self): - print(f"enter {self.current_state.id}") - def on_enter_first_prediction(self): - print(f"enter {self.current_state.id}") - def on_enter_corrected_prediction(self): - print(f"enter {self.current_state.id}") - def on_enter_loitering(self): - print(f"enter {self.current_state.id}") - def on_enter_play(self): - print(f"enter {self.current_state.id}") - def on_enter_lost(self): - print(f"enter {self.current_state.id}") - - - - diff --git a/trap/stage.py b/trap/stage.py new file mode 100644 index 0000000..d360de9 --- /dev/null +++ b/trap/stage.py @@ -0,0 +1,376 @@ +from __future__ import annotations + +from collections import defaultdict +from dataclasses import dataclass +from enum import Enum +import json +import logging +import time +from typing import Dict, List, Optional, Tuple +from statemachine import Event, State, StateMachine +from statemachine.exceptions import TransitionNotAllowed +import zmq + +from sgan.sgan import data +from trap.base import DataclassJSONEncoder, Frame, Track +from trap.counter import CounterSender +from trap.node import Node +from trap.timer import Timer +from trap.utils import exponentialDecay, relativePointToPolar, relativePolarToPoint + + +class ScenarioScene(Enum): + DETECTED = 1 + FIRST_PREDICTION = 2 + CORRECTED_PREDICTION = 3 + LOITERING = 4 + PLAY = 4 + LOST = -1 + +LOST_FADEOUT = 3 + +class TrackScenario(StateMachine): + detected = State(initial=True) + substantial = State() + first_prediction = State() + corrected_prediction = State() + loitering = State() + play = State() + lost = State(final=True) + + receive_track = lost.from_( + detected, first_prediction, corrected_prediction, loitering, play, substantial, cond="track_is_lost" + ) | corrected_prediction.to(loitering, cond="track_is_loitering") | detected.to(substantial, cond="track_is_long") + + mark_lost = lost.from_(detected, substantial, first_prediction, corrected_prediction, loitering, play) + + receive_prediction = detected.to(first_prediction) | first_prediction.to(corrected_prediction, cond="prediction_is_stale") | corrected_prediction.to(play, cond="prediction_is_playing") + + def __init__(self): + self._track = None + self.first_prediction_track: Optional[Track] = None + self.prediction_track: Optional[Track] = None + super().__init__() + + def track_is_long(self, track: Track): + return len(track.history) > 20 + + def track_is_lost(self, track: Track): + # return self._track and self._track.created_at < time.time() - 5 + return track.lost # Note, for now this is not implemented in the tacker, see check_lost() + + def track_is_loitering(self, track: Track): + # TODO)) Change to measure displacement over the last n seconds + return len(track.history) > (track.fps * 60) # seconds after which someone is loitering + + def prediction_is_stale(self, track: Track): + # TODO use displacement instead of time + return bool(self.prediction_track and self.prediction_track.created_at < (time.perf_counter() - 2)) + + def prediction_is_playing(self, Track): + return False + + def check_lost(self): + if self.current_state is not self.lost and self._track and self._track.created_at < time.time() - 5: + self.mark_lost() + + def set_track(self, track: Track): + if self._track and self._track.created_at > track.created_at: + # ignore old track + return + + self._track = track + try: + self.receive_track(track) + except TransitionNotAllowed as e: + # state change is optional + pass + + def set_prediction(self, track: Track): + if not self._track: + # in case of the unlikely event that prediction was passed sooner + self.set_track(track) + + if not self.first_prediction_track: + self.first_prediction_track = track + + if self.prediction_track and (track.created_at - self.prediction_track.created_at) < .7: + # just drop tracks if the predictions come to quick + return + + + self.prediction_track = track + try: + self.receive_prediction(track) + except TransitionNotAllowed as e: + # state change is optional + pass + + def after_receive_track(self, track: Track): + print('change state') + + def on_receive_track(self, track: Track): + # on event, because it happens for every receive, despite transition + print('updating track!') + # self.track = track + + def on_receive_prediction(self, track: Track): + # on event, because it happens for every receive, despite transition + print('updating prediction!') + # self.track = track + + def after_receive_prediction(self, track: Track): + # after + self.prediction_track = track + if not self.first_prediction_track: + self.first_prediction_track = track + + def on_enter_corrected_prediction(self): + print('corrected!') + + def on_enter_detected(self): + print("DETECTED!") + + def on_enter_first_prediction(self): + print("Hello!") + + def on_enter_detected(self): + print(f"enter {self.current_state.id}") + def on_enter_substantial(self): + print(f"enter {self.current_state.id}") + def on_enter_first_prediction(self): + print(f"enter {self.current_state.id}") + def on_enter_corrected_prediction(self): + print(f"enter {self.current_state.id}") + def on_enter_loitering(self): + print(f"enter {self.current_state.id}") + def on_enter_play(self): + print(f"enter {self.current_state.id}") + def on_enter_lost(self): + print(f"enter {self.current_state.id}") + self.lost_at = time.time() + + def lost_for(self): + if self.current_state is self.lost: + return time.time() - self.lost_at + return None + + def lost_factor(self): + l = self.lost_for() + if not l: + return 0 + return l/LOST_FADEOUT + + def to_lines(self) -> List[RenderableLine]: + raise RuntimeError("Not implemented yet") + + +class DrawnScenario(TrackScenario): + """ + Scenario contains the controls (scene, target positions) + DrawnScenario class does the actual drawing of points incl. transitions + """ + def __init__(self): + # self.created_at = time.time() + # self.track_id = track_id + self.last_update_t = time.perf_counter() + + self.drawn_positions: List[Tuple[float,float]] = [] + self.drawn_pred_history: List[Tuple[float,float]] = [] + self.drawn_predictions: List[List[Tuple[float,float]]] = [] + super().__init__() + + def update_drawn_positions(self) -> List: + ''' + use dt to lerp the drawn positions in the direction of current prediction + ''' + # TODO: make lerp, currently quick way to get results + + def int_or_not(v): + """quick wrapper to toggle int'ing""" + return v + # return int(v) + + # 0. calculate dt + # if dt is None: + t = time.perf_counter() + dt = t - self.last_update_t + self.last_update_t = t + + # 1. track history, direct update + positions = self._track.get_projected_history(None, self.camera) + # TODO)) Limit history to N points, or N lenght + for i, pos in enumerate(self.drawn_positions): + self.drawn_positions[i][0] = positions[i][0] + self.drawn_positions[i][1] = positions[i][1] + + if len(positions) > len(self.drawn_positions): + self.drawn_positions.extend(positions[len(self.drawn_positions):]) + + if self.prediction_track and self.prediction_track.predictor_history: + # 2. history as seen by predictor (Trajectron) + for i, pos in enumerate(self.drawn_pred_history): + if len(self.prediction_track.predictor_history) > i: + self.drawn_pred_history[i][0] = int_or_not(exponentialDecay(self.drawn_pred_history[i][0], self.prediction_track.predictor_history[i][0], 16, dt)) + self.drawn_pred_history[i][1] = int_or_not(exponentialDecay(self.drawn_pred_history[i][1], self.prediction_track.predictor_history[i][1], 16, dt)) + + if len(self.prediction_track.predictor_history) > len(self.drawn_pred_history): + self.drawn_pred_history.extend(positions[len(self.drawn_pred_history):]) + + if self.prediction_track and self.prediction_track.predictions: + # 3. predictions + if len(self.prediction_track.predictions): + for a, drawn_prediction in enumerate(self.drawn_predictions): + for i, pos in enumerate(drawn_prediction): + # TODO: this should be done in polar space starting from origin (i.e. self.drawn_posision[-1]) + decay = max(3, (18/i) if i else 10) # points further away move with more delay + decay = 16 + origin = self.drawn_positions[-1] + drawn_r, drawn_angle = relativePointToPolar( origin, drawn_prediction[i]) + pred_r, pred_angle = relativePointToPolar(origin, self.prediction_track.predictions[a][i]) + r = exponentialDecay(drawn_r, pred_r, decay, dt) + angle = exponentialDecay(drawn_angle, pred_angle, decay, dt) + x, y = relativePolarToPoint(origin, r, angle) + self.drawn_predictions[a][i] = int_or_not(x), int_or_not(y) + # self.drawn_predictions[i][0] = int(exponentialDecay(self.drawn_predictions[i][0], self.prediction_track.predictions[i][0], decay, dt)) + # self.drawn_predictions[i][1] = int(exponentialDecay(self.drawn_predictions[i][1], self.prediction_track.predictions[i][1], decay, dt)) + + if len(self.prediction_track.predictions) > len(self.drawn_predictions): + self.drawn_predictions.extend(self.prediction_track.predictions[len(self.drawn_predictions):]) + # for a, drawn_prediction in self.drawn_predictions: + # if len(self.pred_coords) > len(self.drawn_predictions): + # self.drawn_predictions.extend(self.pred_coords[len(self.drawn_predictions):]) + def to_renderable_lines(self) -> RenderableLines: + lines = [] + color = SrgbaColor(1.,0.,0.,1.-self.lost_factor()) + # positions = [RenderablePosition.from_list(pos) for pos in self.drawn_positions] + points = [RenderablePoint(pos, color) for pos in self.drawn_positions] + lines.append(RenderableLine(points)) + + if len(self.drawn_predictions): + color = SrgbaColor(0.,0.5,0.,1.-self.lost_factor()) + # positions = [RenderablePosition.from_list(pos) for pos in self.drawn_positions] + points = [RenderablePoint(pos, color) for pos in self.drawn_predictions[0]] + lines.append(RenderableLine(points)) + + return lines + + # drawn_pred_history + # drawn_predictions + + +# @dataclass +# class RenderablePosition(): +# x: float +# y: float + +# @classmethod +# def from_list(cls, l: List[float, float]) -> RenderablePosition: +# return cls(x = float(l[0]), y=float(l[1])) + +RenderablePosition = Tuple[float,float] + +@dataclass +class SrgbaColor(): + red: float + green: float + blue: float + alpha: float + +@dataclass +class RenderablePoint(): + position: RenderablePosition + color: SrgbaColor + + @classmethod + def from_list(cls, l: List[float, float], color: SrgbaColor) -> RenderablePoint: + return cls([float(l[0]), float(l[1])], color) + +@dataclass +class RenderableLine(): + points: List[RenderablePoint] + +@dataclass +class RenderableLines(): + lines: List[RenderableLine] + +class Stage(Node): + """ + Render a stage, on which different TrackScenarios take place to a + single image of lines. Which can be passed to different renderers + E.g. the laser or image renderers. + """ + + FPS = 60 + + def setup(self): + # self.scenarios: List[DrawnScenario] = [] + self.scenarios: Dict[str, DrawnScenario] = defaultdict(lambda: DrawnScenario()) + self.trajectory_sock = self.sub(self.config.zmq_trajectory_addr) + self.prediction_sock = self.sub(self.config.zmq_prediction_addr) + self.stage_sock = self.pub(self.config.zmq_stage_addr) + + self.counter = CounterSender() + + + def run(self): + prev_time = time.perf_counter() + while self.is_running.is_set(): + self.tick() + + # 1) poll & update + self.loop_receive() + + # 2) render + self.loop_render() + + # 3) calculate latency for desired FPS + now = time.perf_counter() + time_diff = (now - prev_time) + if time_diff < 1/self.FPS: + # print(f"sleep {1/self.FPS - time_diff}") + time.sleep(1/self.FPS - time_diff) + now += 1/self.FPS - time_diff + + prev_time = now + + def loop_receive(self): + # 1) receive predictions + try: + prediction_frame: Frame = self.prediction_sock.recv_pyobj(zmq.NOBLOCK) + for track_id, track in prediction_frame.tracks.items(): + self.scenarios[track_id].set_prediction(track) + except zmq.ZMQError as e: + self.logger.debug(f'reuse prediction') + + # 2) receive tracker tracks + try: + trajectory_frame: Frame = self.trajectory_sock.recv_pyobj(zmq.NOBLOCK) + for track_id, track in trajectory_frame.tracks.items(): + self.scenarios[track_id].set_track(track) + self.scenarios[track_id].camera = trajectory_frame.camera # little hack to pass camera! + except zmq.ZMQError as e: + self.logger.debug(f'reuse tracks') + + # 3) Remove stale tracks + for track_id, scenario in list(self.scenarios.items()): + # check when last tracker update was received + scenario.check_lost() + + if scenario.lost_factor() > 1: + self.logger.info(f"rm track {track_id}") + del self.scenarios[track_id] + + def loop_render(self): + lines: RenderableLine = [] + for track_id, scenario in self.scenarios.items(): + scenario.update_drawn_positions() + + lines.extend(scenario.to_renderable_lines()) + + # print(lines) + rl = RenderableLines(lines) + self.counter.set("stage.lines", len(lines)) + self.stage_sock.send_json(rl, cls=DataclassJSONEncoder) + + diff --git a/trap/timer.py b/trap/timer.py index d3aa8b8..bdf90ea 100644 --- a/trap/timer.py +++ b/trap/timer.py @@ -1,13 +1,13 @@ import collections -from re import A import time -from multiprocessing.sharedctypes import RawValue, Value, Array -from ctypes import c_double +from multiprocessing.sharedctypes import Value from typing import MutableSequence class Timer(): """ + Multiprocess timer. Count iterations in one process, while converting that + to fps in the other. Measure 2 independent things: the freuency of tic, and the duration of tic->toc Note that indeed these don't need to be equal """ @@ -40,7 +40,6 @@ class Timer(): @property def fps(self): - fpses = [] if len(self.tocs) < 2: return 0 dt = self.tocs[-1][0] - self.tocs[0][0] diff --git a/trap/utils.py b/trap/utils.py index 7622949..7a49800 100644 --- a/trap/utils.py +++ b/trap/utils.py @@ -1,5 +1,6 @@ # lerp & inverse lerp from https://gist.github.com/laundmo/b224b1f4c8ef6ca5fe47e132c8deab56 import linecache +import math import os from pathlib import Path import tracemalloc @@ -27,6 +28,20 @@ def inv_lerp(a: float, b: float, v: float) -> float: """ return (v - a) / (b - a) + +def exponentialDecay(a, b, decay, dt): + """Exponential decay as alternative to Lerp + Introduced by Freya Holmér: https://www.youtube.com/watch?v=LSNQuFEDOyQ + """ + return b + (a-b) * math.exp(-decay * dt) + +def relativePointToPolar(origin, point) -> tuple[float, float]: + x, y = point[0] - origin[0], point[1] - origin[1] + return np.sqrt(x**2 + y**2), np.arctan2(y, x) + +def relativePolarToPoint(origin, r, angle) -> tuple[float, float]: + return r * np.cos(angle) + origin[0], r * np.sin(angle) + origin[1] + # def line_intersection(line1, line2): # xdiff = (line1[0][0] - line1[1][0], line2[0][0] - line2[1][0]) # ydiff = (line1[0][1] - line1[1][1], line2[0][1] - line2[1][1])