""" This project produces "detections" for DeepSORT in an attempt to trick the algorithm into moonwalking over a crowd. The framerate of rendering and detection can be distinct. Also, all parameters (incl. framerate) can change along the way, thus positions cannot depend on that. """ from __future__ import annotations from dataclasses import dataclass import dataclasses from typing import Optional import time import pyglet import logging import numpy as np # from deep_sort_realtime.deepsort_tracker import DeepSort from sort import Sort from collections import defaultdict from collections.abc import Callable import math import easing_functions from easing_functions.easing import EasingBase from functools import partial Interval = float # seconds logger = logging.getLogger('moonwalk') colorset = [(255, 0, 0), (0, 0, 255), (0, 255, 0), (0, 255, 255), (255, 0, 0), (255, 0, 255), (255, 255, 0) ] VELOCITY_FACTOR = 30*.05 # a pixels/second velocity of 400 gives a frame-duration of .05 images_nrs = range(125, 162) angles = list(range(0,360,15)) # angles.pop(angles.index(60)) # angles.pop(angles.index(75)) # angles.pop(angles.index(90)) walk_animations = {} for a in angles: walk_images = [] for nr in images_nrs: # smaller textures doesn't seem to matter much in speed fn = f'walk-animation/OUT/frame{nr:03d}_{a:02d}.png' pic: pyglet.image.ImageData = pyglet.image.load(fn) walk_images.append(pic) walk_animations[a] = pyglet.image.animation.Animation.from_image_sequence(walk_images, 1, True) logger.info("Loaded animation for {a} degree") walk_animation = walk_animations[0] # walk_image = pyglet.image.load('walk_bw.jpg') # TODO investigaet pyglet.resource: https://pyglet.readthedocs.io/en/latest/programming_guide/image.html#displaying-images # image_grid = pyglet.image.ImageGrid(walk_image, rows=1, columns=4) # # texture_grid = image_grid.get_texture_sequence() # walk_animation = image_grid.get_animation(period=.05) walk_animation_dim = { 'y': walk_animation.get_max_height(), 'x': walk_animation.get_max_width() } class PotmeterShape(): value = 0 def __init__(self, cx, cy, r, canvas: Canvas, input: Input, batch: Optional[pyglet.graphics.Batch] = None): # A rendered potmeter self.input = input self.canvas = canvas self.batch=batch self.cx = cx self.cy = cy self.r = r x, y = self.get_xy_for_t(input.t) self.shapes = ( pyglet.shapes.Arc(cx,cy,30, batch= batch), # potmeter pyglet.shapes.Line(cx,cy,x,y, width=2, batch= batch) # potmeter position ) # TODO: mouse scroll # TODO: listen for actual values self.input.add_listener(self.rotate) def rotate(self, value, old_value, t, old_t): x, y = self.get_xy_for_t(t) self.shapes[1].x2 = x self.shapes[1].y2 = y # self.shapes[1].x2 # self.shapes[1].y2 pass # self.shapes.line...... def get_xy_for_t(self, t): return ( self.cx + self.r * math.sin(t * math.pi * 2), self.cy + self.r * math.cos(t * math.pi * 2) ) def on_mouse_scroll(self, x, y, scroll_x, scroll_y): # determine xy position to select var to change, # then change according to scroll_y if x >= self.cx - self.r and \ x <= self.cx + self.r and \ y >= self.cy - self.r and \ y <= self.cy + self.r: self.input.t += scroll_y / 100 class Input(): """ maps linear input (t) to params value """ def __init__(self, default_t: float = 0, mapping: EasingBase = easing_functions.LinearInOut(start=0, end=100)): # listereners are callables which have 2 float params and return nothing: self._listeners: list[Callable[[float,float, float, float], None]] = [] self.mapping = mapping self._t = None self._v = None self.t = default_t # pyglet.text.Label(100,100,30) #name # pyglet.text.Label(100,100,30) # current value @property def t(self): return self._t @t.setter def t(self, t: float): """ Set the underlying linear value """ # 0 <= v <= 1024 old_t = self._t old_v = self._v t = min(1, max(0, t)) if not old_t or abs(old_t - t) > 0.002: # resolution of 500, to mitigate analog potmeter noise self._t = t self._v = self.mapping(self._t) for listener in self._listeners: listener(self.value, old_v, self._t, old_t) @property def value(self) -> float: """ Get value converted from self.v """ return self.mapping(self._t) def add_listener(self, callback: Callable[[float, float, float, float], None]): """ Add a listener for a change, callback gets called with (old_value, new_value) """ self._listeners.append(callback) def get_listeners(self) -> list[Callable[[float, float, float, float], None]]: return self._listeners class SteppedEasing(EasingBase): def __init__(self, steps: list): self.steps = steps pass def ease(self, alpha): i = math.ceil(max(1e-100, alpha) * len(self.steps)) - 1 return self.steps[i] class Params: def __init__(self): self.visual_variability = Input(0, easing_functions.LinearInOut(start=0, end=100)) # video_fps = Input(60 self.tracker_fps = Input(.3, easing_functions.QuadEaseIn(start=.1, end=60)) # iou = None self.emitter_speed = Input(.2, easing_functions.LinearInOut(start=0, end=7)) # objects per second self.object_velocity = Input(.4, easing_functions.LinearInOut(start=0, end=500)) # pixels/second TODO: allow negative # velocity_decay = Input(1, easing_functions.LinearInOut(start=0, end=100)) # make system a bit springy self.iou_threshold = Input(0, easing_functions.LinearInOut(start=0, end=1)) # SORT Intersection over union self.movement_angle = Input(0, SteppedEasing(angles)) # angle at which the blobs move (-45 - +45 degrees) def keys(self) -> list[str]: return list(filter(lambda a: not a.startswith('__') and a not in ['keys', 'items', 'values'], dir(self))) def items(self) -> list[tuple[str, Input]]: return [ (k, getattr(self, k)) for k in self.keys() ] def values(self) -> list[Input]: return [ getattr(self, k) for k in self.keys() ] class DetectedObject: def __init__(self, canvas: Canvas): self.canvas = canvas # TODO handle variability self.v = self.canvas.params.object_velocity self.w = 160 # width self.h = 320 # height self.l = -self.w # left self.t = (self.canvas.height - self.h)/2 - math.sin(self.canvas.params.movement_angle.value/360*math.pi*2) * self.canvas.width/2 # top # self.shape = pyglet.shapes.Rectangle(self.l, self.t, self.w, self.h, color=(255, 22, 20), batch=self.canvas.batch_figures) self.shape = pyglet.sprite.Sprite(img=walk_animations[self.canvas.angle],x=self.l, y=self.t, batch=self.canvas.batch_figures) self.shape.scale_x = self.w/walk_animation_dim['x'] self.shape.scale_y = self.h/walk_animation_dim['y'] # rectangle.opacity = 128 # rectangle.rotation = 33 #TODO renderer def update(self, dt: Interval): """ Update position """ self.l += dt * self.canvas.params.object_velocity.value * math.cos(self.canvas.params.movement_angle.value/360*2 * math.pi) self.t += dt * self.canvas.params.object_velocity.value * math.sin(self.canvas.params.movement_angle.value/360*2 * math.pi) self.shape.x = self.l self.shape.y = self.t # TODO exponential decay with self.params.velocity_decay class ObjectEmitter: """ Emit detectable objects """ def __init__(self, params: Params, canvas: Canvas): self.lastEmit = 0 self.params = params self.canvas = canvas def emit(self, dt: Interval) -> list[DetectedObject]: self.lastEmit += dt if self.params.emitter_speed.value == 0: return [] if self.lastEmit is None or self.lastEmit >= 1/self.params.emitter_speed.value: logger.info('emit!') obj = DetectedObject(self.canvas) self.lastEmit = 0 return [obj] return [] class MissingDict[T](dict): """ collections.defaultdict does not accept arguments, but this is what we want/need. This implementation should do the trick """ def __init__(self, factory: Callable, values={}): self.update(values) self.factory = factory def __missing__(self, key) -> T: self[key] = self.factory(key) return self[key] class Canvas: """ A canvas with moving objects """ def __init__(self, params: Params): self.width = 1920 self.height = 1080 self.objects: list[DetectedObject] = [] self.lastSnapshot: Optional[float] = None self.params = params self.emitter = ObjectEmitter(self.params, self) self.hide_stats = False config = pyglet.gl.Config(sample_buffers=1, samples=4, double_buffer=True) # , fullscreen=self.config.render_window self.window = pyglet.window.Window(width=self.width, height=self.height, config=config, fullscreen=False) self.window.set_handler('on_draw', self.on_draw) self.window.set_handler('on_key_press', self.on_key_press) self.window.set_handler('on_mouse_scroll', self.on_mouse_scroll) self.window.set_handler('on_refresh', self.on_refresh) # self.window.set_handler('on_refresh', self.on_refresh) # self.window.set_handler('on_close', self.on_close) # Purple background color: pyglet.gl.glClearColor(230/255,230/255,230/255,230/255) self.draw_stats = True self.fps_display = pyglet.window.FPSDisplay(window=self.window, color=(255,255,255,255), samples=100) self.fps_display.label.x = self.window.width - 150 self.fps_display.label.y = self.window.height - 17 self.fps_display.label.bold = False self.fps_display.label.font_size = 10 self.batch_figures = pyglet.graphics.Batch() self.batch_bounding_boxes = pyglet.graphics.Batch() self.batch_info = pyglet.graphics.Batch() self.tracks = [] self.labels = { 'objects': pyglet.text.Label("", x=20, y=30, color=(255,255,255,255), batch=self.batch_info), 'tracks': pyglet.text.Label("", x=120, y=30, color=(255,255,255,255), batch=self.batch_info), } print('ok', self.params) self.potmeters: list[PotmeterShape] = [] for i, item in enumerate(self.params.values()): self.potmeters.append( PotmeterShape(300+80*i, 130, 30, self, item, self.batch_info) ) for i, param in enumerate(self.params.items()): name = param[0] value = param[1] print(name, value) self.labels[name] = pyglet.text.Label(f"{name}: {value.value}", x=20, y=30 + 20*(i+1), color=(255,255,255,255), batch=self.batch_info) # TODO: Add a number next to the box using pyglet.graphics.Group() self.track_shapes: MissingDict[np.float64, tuple[pyglet.shapes.Box, pyglet.text.Label]] = MissingDict(self.getTrackBboxShapes) self.tracker = Sort(max_age=5, min_hits=1, iou_threshold=0) #DeepSort(max_age=5) pyglet.clock.schedule_interval(self.on_track, 1/self.params.tracker_fps.value) self.interval_items: list[pyglet.clock._ScheduledIntervalItem] = [i for i in pyglet.clock._default._schedule_interval_items if i.func == self.on_track] self.params.tracker_fps.add_listener(self.on_tracker_fps_change) self.params.iou_threshold.add_listener(self.on_iou_threshold_change) self.params.object_velocity.add_listener(self.on_object_velocity_change) self.params.movement_angle.add_listener(self.on_movement_angle_change) self.on_movement_angle_change(self.params.movement_angle.value, None, None, None) # set self.angle # trigger first time around self.on_object_velocity_change(self.params.object_velocity.value, None, None, None) def getTrackBboxShapes(self, track_id) -> tuple[pyglet.shapes.Box, pyglet.text.Label]: color = colorset[int(track_id) % len(colorset)] return ( pyglet.shapes.Box(0,0,0,0,color=color,thickness=2, batch=self.batch_bounding_boxes), pyglet.text.Label(f"{track_id:.0f}", x=0, y=0, anchor_y='top', color=color, batch=self.batch_bounding_boxes), # pyglet.shapes.Lab(0,0,0,0,color=(0,255,0),thickness=2, batch=self.batch_bounding_boxes) ) def on_tracker_fps_change(self, value, old_value, t, old_t): """ Param dataclass listener for changes to tracker_fps """ for ii in self.interval_items: ii.interval = 1/value logger.debug(f"Set tracker interval to {ii.interval}") def on_object_velocity_change(self, value, old_value, t, old_t): """ Param dataclass listener for changes to object_velocity as to change walk animation """ # TODO: when walking backwards, animation should reverse duration = max(.001, VELOCITY_FACTOR / max(.1, value)) logger.debug(f"set frame duration to {duration=} (for {value} p/s, factor: {VELOCITY_FACTOR})") for anim in walk_animations.values(): for frame in anim.frames: # a velocity of frame.duration = duration # for ii in self.interval_items: # ii.interval = 1/new_value # logger.debug(f"Set tracker interval to {ii.interval}") def on_movement_angle_change(self, value, old_value, t, old_t): """ Param dataclass listener for changes to movement-angle """ angle = min(angles, key=lambda x:abs(x-value)) if hasattr(self, 'angle') and self.angle == angle: return self.angle = angle for obj in self.objects: idx = obj.shape._frame_index # setting the .image property removes the frame offset, this causes the animations to sync. Because all animations have the same nr of frames, we can get and re-set this offset obj.shape.image = walk_animations[self.angle] obj.shape._frame_index = idx def on_iou_threshold_change(self, value, old_value, t, old_t): """ Param dataclass listener for changes to iou_threshold """ self.tracker.iou_threshold = value def run(self): self.event_loop = pyglet.app.EventLoop() # pyglet.clock.schedule_interval(self.check_running, 0.1) # pyglet.clock.schedule(self.check_frames) # pyglet.clock.schedule(self.track) self.event_loop.run() def on_draw(self): self.window.clear() self.batch_figures.draw() self.batch_bounding_boxes.draw() self.batch_info.draw() if self.draw_stats: self.fps_display.draw() def on_close(self): logger.info('closing') pass def on_key_press(self, symbol, modifiers): if symbol == pyglet.window.key.Q or symbol == pyglet.window.key.ESCAPE: self.window.close() exit() if symbol == pyglet.window.key.V: level = logging.INFO if logger.getEffectiveLevel() == logging.DEBUG else logging.DEBUG logger.setLevel(level) logger.info(f"set log level: {level}") if symbol == pyglet.window.key.S: self.draw_stats = not self.draw_stats logger.info(f"rendering stats: {self.draw_stats}") def on_mouse_scroll(self, x, y, scroll_x, scroll_y): # determine xy position to select var to change, # then change according to scroll_y for potmeter in self.potmeters: potmeter.on_mouse_scroll(x, y, scroll_x, scroll_y) def on_track(self, dt): # bbs = object_detector.detect(frame) objects = self.snapshot() # TODO chipper & embedder # bbs = [([o.l, o.t, o.w, o.h], 1, 1) for o in objects] # DEEP SORT: self.tracks = self.tracker.update_tracks(bbs, frame=np.zeros([1280,720])) # bbs expected to be a list of detections, each in tuples of ( [left,top,w,h], confidence, detection_class ) bbs = np.array([[o.l, o.t, o.l+ o.w, o.t+o.h, 1, 1] for o in objects]) self.tracks = self.tracker.update(bbs) # a numpy array of detections in the format [[x1,y1,x2,y2,score],[x1,y1,x2,y2,score],...] # self.tracks is a np array where each row contains a valid bounding box and track_id (last column) # remove old shapes ids = [track[4] for track in self.tracks] for k in list(self.track_shapes.keys()): if k not in ids: self.track_shapes.pop(k) logger.debug(f"shape removed {k}" ) # print([t[4] for t in self.tracks]) # def set_tracker_fps(self, fps: float): # self.params.tracker_fps.value = fps # for interval in self.interval_items: # interval.interval = 1/fps def prune(self): """ Loop over objects remove those out of the frame """ # TODO now that there is rotation, this check needs to be a bit more elaborated for i, object in enumerate(self.objects.copy()): if object.l > self.width: logging.info(f'Delete {i}') el = self.objects.pop(i) el.shape.delete() # clear the attached shape def snapshot(self) -> list[DetectedObject]: """ Update all object positions base on dt = now - lastSnapshot """ now = time.monotonic() if self.lastSnapshot is None: self.lastSnapshot = now dt = now - self.lastSnapshot self.objects.extend(self.emitter.emit(dt)) for object in self.objects: object.update(dt) self.prune() self.lastSnapshot = now # print(f"duration: {time.monotonic()-now) return self.objects def on_refresh(self, dt: float): objects = self.snapshot() self.labels['objects'].text = f"Objects: {len(objects)}" self.labels['tracks'].text = f"Tracks: {len(self.tracks)}" # self.labels['velocity'].text = f"Velocity: {self.params.object_velocity}" # self.labels['tracker_fps'].text = f"Tracker FPS: {self.params.tracker_fps}" for name, item in self.params.items(): self.labels[name].text = f"{name}: {item.value:.3f}" for track in self.tracks: nr = track[4] box: pyglet.shape.Box = self.track_shapes[nr][0] label: pyglet.text.Label = self.track_shapes[nr][1] if box.x == 0 and box.y == 0: # set initial position box.x = track[0] box.y = track[1] label.x = track[0] label.y = track[1] box.width = track[2] - track[0] box.height = track[3] - track[1] else: # not really impact # exponential decay label.x = box.x = exponentialDecay(box.x, track[0], 12, dt) label.y = box.y = exponentialDecay(box.y, track[1], 12, dt) # setting width and height on label is not needed _and_ makes it super slow box.width = exponentialDecay(box.width, track[2] - track[0], 12, dt) box.height = exponentialDecay(box.height, track[3] - track[1], 12, dt) # TODO: shape in DetectedObject # rectangle = shapes.Rectangle(250, 300, 400, 200, color=(255, 22, 20), batch=batch) # rectangle.opacity = 128 # rectangle.rotation = 33 # print(objects) # id(objects) def exponentialDecay(a, b, decay, dt): """Exponential decay as alternative to Lerp Introduced by Freya Holmér: https://www.youtube.com/watch?v=LSNQuFEDOyQ """ return b + (a-b) * math.exp(-decay * dt) if __name__ == "__main__": logging.basicConfig(level=logging.INFO) params = Params() canvas = Canvas(params) canvas.run()