diff --git a/moonwalk.py b/moonwalk.py index 94cbca2..17f0722 100644 --- a/moonwalk.py +++ b/moonwalk.py @@ -18,8 +18,11 @@ import numpy as np # from deep_sort_realtime.deepsort_tracker import DeepSort from sort import Sort from collections import defaultdict +from collections.abc import Callable import math - +import easing_functions +from easing_functions.easing import EasingBase +from functools import partial Interval = float # seconds @@ -39,7 +42,11 @@ colorset = [(255, 0, 0), VELOCITY_FACTOR = 30*.05 # a pixels/second velocity of 400 gives a frame-duration of .05 images_nrs = range(125, 162) -angles = [0,15,30,45] +angles = list(range(0,360,15)) +# angles.pop(angles.index(60)) +# angles.pop(angles.index(75)) +# angles.pop(angles.index(90)) + walk_animations = {} for a in angles: @@ -50,6 +57,7 @@ for a in angles: pic: pyglet.image.ImageData = pyglet.image.load(fn) walk_images.append(pic) walk_animations[a] = pyglet.image.animation.Animation.from_image_sequence(walk_images, 1, True) + logger.info("Loaded animation for {a} degree") walk_animation = walk_animations[0] @@ -62,52 +70,141 @@ walk_animation_dim = { 'x': walk_animation.get_max_width() } +class PotmeterShape(): + value = 0 -@dataclass -class Params: - visual_variability: float = 0 - # video_fps: float = 60 - tracker_fps: float = 5.6 - # iou = None - emitter_speed: float = 1.2 # objects per second - object_velocity: float = 100 # pixels/second - velocity_decay: float = 1 # make system a bit springy - iou_threshold: float = 0 # SORT Intersection over union - movement_angle: float = 0 # angle at which the blobs move (-45 - +45 degrees) + def __init__(self, cx, cy, r, canvas: Canvas, input: Input, batch: Optional[pyglet.graphics.Batch] = None): + # A rendered potmeter + self.input = input + self.canvas = canvas + self.batch=batch + self.cx = cx + self.cy = cy + self.r = r - def add_listener(self, attr: str, callback: callable): + x, y = self.get_xy_for_t(input.t) + self.shapes = ( + pyglet.shapes.Arc(cx,cy,30, batch= batch), # potmeter + pyglet.shapes.Line(cx,cy,x,y, width=2, batch= batch) # potmeter position + ) + # TODO: mouse scroll + # TODO: listen for actual values + self.input.add_listener(self.rotate) + + def rotate(self, value, old_value, t, old_t): + x, y = self.get_xy_for_t(t) + self.shapes[1].x2 = x + self.shapes[1].y2 = y + # self.shapes[1].x2 + # self.shapes[1].y2 + pass + # self.shapes.line...... + + def get_xy_for_t(self, t): + return ( + self.cx + self.r * math.sin(t * math.pi * 2), + self.cy + self.r * math.cos(t * math.pi * 2) + ) + + def on_mouse_scroll(self, x, y, scroll_x, scroll_y): + # determine xy position to select var to change, + # then change according to scroll_y + if x >= self.cx - self.r and \ + x <= self.cx + self.r and \ + y >= self.cy - self.r and \ + y <= self.cy + self.r: + self.input.t += scroll_y / 100 + + +class Input(): + """ + maps linear input (t) to params value + """ + def __init__(self, default_t: float = 0, mapping: EasingBase = easing_functions.LinearInOut(start=0, end=100)): + # listereners are callables which have 2 float params and return nothing: + self._listeners: list[Callable[[float,float, float, float], None]] = [] + self.mapping = mapping + + self._t = None + self._v = None + + self.t = default_t + # pyglet.text.Label(100,100,30) #name + # pyglet.text.Label(100,100,30) # current value + + + @property + def t(self): + return self._t + + @t.setter + def t(self, t: float): + """ + Set the underlying linear value + """ + # 0 <= v <= 1024 + old_t = self._t + old_v = self._v + + t = min(1, max(0, t)) + + + if not old_t or abs(old_t - t) > 0.002: # resolution of 500, to mitigate analog potmeter noise + self._t = t + self._v = self.mapping(self._t) + for listener in self._listeners: + listener(self.value, old_v, self._t, old_t) + + @property + def value(self) -> float: + """ + Get value converted from self.v + """ + return self.mapping(self._t) + + def add_listener(self, callback: Callable[[float, float, float, float], None]): """ Add a listener for a change, callback gets - called with (attribute_name, old_value, new_value) + called with (old_value, new_value) """ - if not attr in self.get_listeners(): - self._listeners[attr] = [] - self._listeners[attr].append(callback) + self._listeners.append(callback) - def get_listeners(self) -> dict[str, callable]: - if not hasattr(self, "_listeners"): - self._listeners = {} + def get_listeners(self) -> list[Callable[[float, float, float, float], None]]: return self._listeners - def __setattr__(self, attr, val): - if attr == '_listeners': - super().__setattr__(attr, val) - return - - if attr == 'tracker_fps' and val < .1: - # limit tracker fps - val = .1 - - if attr == 'emitter_speed' and val < .1: - # limit tracker fps - val = .1 - - old_val = getattr(self, attr) - super().__setattr__(attr, val) - if attr in self.get_listeners(): - for listener in self._listeners[attr]: - listener(attr, old_val, val) +class SteppedEasing(EasingBase): + def __init__(self, steps: list): + self.steps = steps + pass + def ease(self, alpha): + i = math.ceil(max(1e-100, alpha) * len(self.steps)) - 1 + return self.steps[i] + +class Params: + def __init__(self): + self.visual_variability = Input(0, easing_functions.LinearInOut(start=0, end=100)) + # video_fps = Input(60 + self.tracker_fps = Input(.3, easing_functions.QuadEaseIn(start=.1, end=60)) + # iou = None + self.emitter_speed = Input(.2, easing_functions.LinearInOut(start=0, end=7)) # objects per second + self.object_velocity = Input(.4, easing_functions.LinearInOut(start=0, end=500)) # pixels/second TODO: allow negative + # velocity_decay = Input(1, easing_functions.LinearInOut(start=0, end=100)) # make system a bit springy + self.iou_threshold = Input(0, easing_functions.LinearInOut(start=0, end=1)) # SORT Intersection over union + self.movement_angle = Input(0, SteppedEasing(angles)) # angle at which the blobs move (-45 - +45 degrees) + + def keys(self) -> list[str]: + return list(filter(lambda a: not a.startswith('__') and a not in ['keys', 'items', 'values'], dir(self))) + + def items(self) -> list[tuple[str, Input]]: + return [ + (k, getattr(self, k)) for k in self.keys() + ] + + def values(self) -> list[Input]: + return [ + getattr(self, k) for k in self.keys() + ] @@ -119,13 +216,12 @@ class DetectedObject: self.w = 160 # width self.h = 320 # height self.l = -self.w # left - self.t = (self.canvas.height - self.h)/2 - math.sin(self.canvas.params.movement_angle/360*math.pi*2) * self.canvas.width/2 # top + self.t = (self.canvas.height - self.h)/2 - math.sin(self.canvas.params.movement_angle.value/360*math.pi*2) * self.canvas.width/2 # top # self.shape = pyglet.shapes.Rectangle(self.l, self.t, self.w, self.h, color=(255, 22, 20), batch=self.canvas.batch_figures) - angle = min(angles, key=lambda x:abs(x-self.canvas.params.movement_angle)) - self.shape = pyglet.sprite.Sprite(img=walk_animations[angle],x=self.l, y=self.t, batch=self.canvas.batch_figures) + self.shape = pyglet.sprite.Sprite(img=walk_animations[self.canvas.angle],x=self.l, y=self.t, batch=self.canvas.batch_figures) self.shape.scale_x = self.w/walk_animation_dim['x'] self.shape.scale_y = self.h/walk_animation_dim['y'] # rectangle.opacity = 128 @@ -136,8 +232,8 @@ class DetectedObject: """ Update position """ - self.l += dt * self.canvas.params.object_velocity * math.cos(self.canvas.params.movement_angle/360*2 * math.pi) - self.t += dt * self.canvas.params.object_velocity * math.sin(self.canvas.params.movement_angle/360*2 * math.pi) + self.l += dt * self.canvas.params.object_velocity.value * math.cos(self.canvas.params.movement_angle.value/360*2 * math.pi) + self.t += dt * self.canvas.params.object_velocity.value * math.sin(self.canvas.params.movement_angle.value/360*2 * math.pi) self.shape.x = self.l self.shape.y = self.t # TODO exponential decay with self.params.velocity_decay @@ -153,7 +249,10 @@ class ObjectEmitter: def emit(self, dt: Interval) -> list[DetectedObject]: self.lastEmit += dt - if self.lastEmit is None or self.lastEmit >= 1/self.params.emitter_speed: + if self.params.emitter_speed.value == 0: + return [] + + if self.lastEmit is None or self.lastEmit >= 1/self.params.emitter_speed.value: logger.info('emit!') obj = DetectedObject(self.canvas) self.lastEmit = 0 @@ -165,7 +264,7 @@ class MissingDict[T](dict): collections.defaultdict does not accept arguments, but this is what we want/need. This implementation should do the trick """ - def __init__(self, factory: callable, values={}): + def __init__(self, factory: Callable, values={}): self.update(values) self.factory = factory @@ -221,23 +320,35 @@ class Canvas: 'objects': pyglet.text.Label("", x=20, y=30, color=(255,255,255,255), batch=self.batch_info), 'tracks': pyglet.text.Label("", x=120, y=30, color=(255,255,255,255), batch=self.batch_info), } - for i, field in enumerate(dataclasses.fields(self.params)): - self.labels[field.name] = pyglet.text.Label(f"{field.name}: {field.default}", x=20, y=30 + 20*(i+1), color=(255,255,255,255), batch=self.batch_info) + + print('ok', self.params) + self.potmeters: list[PotmeterShape] = [] + for i, item in enumerate(self.params.values()): + self.potmeters.append( + PotmeterShape(300+80*i, 130, 30, self, item, self.batch_info) + ) + + for i, param in enumerate(self.params.items()): + name = param[0] + value = param[1] + print(name, value) + self.labels[name] = pyglet.text.Label(f"{name}: {value.value}", x=20, y=30 + 20*(i+1), color=(255,255,255,255), batch=self.batch_info) # TODO: Add a number next to the box using pyglet.graphics.Group() self.track_shapes: MissingDict[np.float64, tuple[pyglet.shapes.Box, pyglet.text.Label]] = MissingDict(self.getTrackBboxShapes) self.tracker = Sort(max_age=5, min_hits=1, iou_threshold=0) #DeepSort(max_age=5) - pyglet.clock.schedule_interval(self.on_track, 1/self.params.tracker_fps) + pyglet.clock.schedule_interval(self.on_track, 1/self.params.tracker_fps.value) self.interval_items: list[pyglet.clock._ScheduledIntervalItem] = [i for i in pyglet.clock._default._schedule_interval_items if i.func == self.on_track] - self.params.add_listener('tracker_fps', self.on_tracker_fps_change) - self.params.add_listener('iou_threshold', self.on_iou_threshold_change) - self.params.add_listener('object_velocity', self.on_object_velocity_change) - self.params.add_listener('movement_angle', self.on_movement_angle_change) + self.params.tracker_fps.add_listener(self.on_tracker_fps_change) + self.params.iou_threshold.add_listener(self.on_iou_threshold_change) + self.params.object_velocity.add_listener(self.on_object_velocity_change) + self.params.movement_angle.add_listener(self.on_movement_angle_change) + self.on_movement_angle_change(self.params.movement_angle.value, None, None, None) # set self.angle # trigger first time around - self.on_object_velocity_change('object_velocity', None, self.params.object_velocity) + self.on_object_velocity_change(self.params.object_velocity.value, None, None, None) def getTrackBboxShapes(self, track_id) -> tuple[pyglet.shapes.Box, pyglet.text.Label]: color = colorset[int(track_id) % len(colorset)] @@ -248,20 +359,21 @@ class Canvas: ) - def on_tracker_fps_change(self, field, old_value, new_value): + def on_tracker_fps_change(self, value, old_value, t, old_t): """ Param dataclass listener for changes to tracker_fps """ for ii in self.interval_items: - ii.interval = 1/new_value + ii.interval = 1/value logger.debug(f"Set tracker interval to {ii.interval}") - def on_object_velocity_change(self, field, old_value, new_value): + def on_object_velocity_change(self, value, old_value, t, old_t): """ Param dataclass listener for changes to object_velocity as to change walk animation """ - duration = max(.001, VELOCITY_FACTOR / new_value) - logger.debug(f"set frame duration to {duration=} (for {new_value} p/s, factor: {VELOCITY_FACTOR})") + # TODO: when walking backwards, animation should reverse + duration = max(.001, VELOCITY_FACTOR / max(.1, value)) + logger.debug(f"set frame duration to {duration=} (for {value} p/s, factor: {VELOCITY_FACTOR})") for anim in walk_animations.values(): for frame in anim.frames: # a velocity of @@ -271,11 +383,11 @@ class Canvas: # ii.interval = 1/new_value # logger.debug(f"Set tracker interval to {ii.interval}") - def on_movement_angle_change(self, field, old_value, new_value): + def on_movement_angle_change(self, value, old_value, t, old_t): """ Param dataclass listener for changes to movement-angle """ - angle = min(angles, key=lambda x:abs(x-new_value)) + angle = min(angles, key=lambda x:abs(x-value)) if hasattr(self, 'angle') and self.angle == angle: return @@ -283,15 +395,15 @@ class Canvas: self.angle = angle for obj in self.objects: - idx = obj.shape._frame_index + idx = obj.shape._frame_index # setting the .image property removes the frame offset, this causes the animations to sync. Because all animations have the same nr of frames, we can get and re-set this offset obj.shape.image = walk_animations[self.angle] obj.shape._frame_index = idx - def on_iou_threshold_change(self, field, old_value, new_value): + def on_iou_threshold_change(self, value, old_value, t, old_t): """ Param dataclass listener for changes to iou_threshold """ - self.tracker.iou_threshold = new_value + self.tracker.iou_threshold = value def run(self): @@ -327,32 +439,12 @@ class Canvas: if symbol == pyglet.window.key.S: self.draw_stats = not self.draw_stats logger.info(f"rendering stats: {self.draw_stats}") - if symbol == pyglet.window.key.UP: - logger.debug('up') - self.params.object_velocity += (10 if pyglet.window.key.MOD_SHIFT & modifiers else 1) - if symbol == pyglet.window.key.DOWN: - logger.debug('down') - self.params.object_velocity -= (10 if pyglet.window.key.MOD_SHIFT & modifiers else 1) def on_mouse_scroll(self, x, y, scroll_x, scroll_y): # determine xy position to select var to change, # then change according to scroll_y - for param_name, param_value in dataclasses.asdict(self.params).items(): - if x >= self.labels[param_name].x and \ - x <= (self.labels[param_name].x + self.labels[param_name].content_width) and \ - y >= self.labels[param_name].y and \ - y <= (self.labels[param_name].y + self.labels[param_name].content_height): - if param_value == 0: - if scroll_y < 1: - param_value -= .1 - else: - param_value += .1 - - if scroll_y < 0: - param_value /= (-scroll_y * 1.25) - else: - param_value *= (scroll_y * 1.25) - setattr(self.params, param_name, param_value) + for potmeter in self.potmeters: + potmeter.on_mouse_scroll(x, y, scroll_x, scroll_y) def on_track(self, dt): @@ -376,16 +468,17 @@ class Canvas: # print([t[4] for t in self.tracks]) - def set_tracker_fps(self, fps: float): - self.params.tracker_fps = fps - for interval in self.interval_items: - interval.interval = 1/fps + # def set_tracker_fps(self, fps: float): + # self.params.tracker_fps.value = fps + # for interval in self.interval_items: + # interval.interval = 1/fps def prune(self): """ Loop over objects remove those out of the frame """ + # TODO now that there is rotation, this check needs to be a bit more elaborated for i, object in enumerate(self.objects.copy()): if object.l > self.width: logging.info(f'Delete {i}') @@ -422,10 +515,9 @@ class Canvas: # self.labels['velocity'].text = f"Velocity: {self.params.object_velocity}" # self.labels['tracker_fps'].text = f"Tracker FPS: {self.params.tracker_fps}" - for name, value in dataclasses.asdict(self.params).items(): - self.labels[name].text = f"{name}: {value}" + for name, item in self.params.items(): + self.labels[name].text = f"{name}: {item.value:.3f}" - # BIG impact for track in self.tracks: nr = track[4] box: pyglet.shape.Box = self.track_shapes[nr][0] diff --git a/poetry.lock b/poetry.lock index ca8b007..27e3de2 100644 --- a/poetry.lock +++ b/poetry.lock @@ -115,6 +115,17 @@ numpy = "*" opencv-python = "*" scipy = "*" +[[package]] +name = "easing-functions" +version = "1.0.4" +description = "A collection of the basic easing functions for python" +optional = false +python-versions = "*" +files = [ + {file = "easing_functions-1.0.4-py3-none-any.whl", hash = "sha256:27f2ce64adecde3d2d90d503ad564fc944e8f20221ba2eacf96be37c28c7ae4b"}, + {file = "easing_functions-1.0.4.tar.gz", hash = "sha256:e18c7931d445b85f28c4d15ad0a9a47bb65d4e2eefc0db3840448fae25e3f9de"}, +] + [[package]] name = "filterpy" version = "1.4.5" @@ -667,6 +678,20 @@ files = [ [package.extras] diagrams = ["jinja2", "railroad-diagrams"] +[[package]] +name = "pyserial" +version = "3.5" +description = "Python Serial Port Extension" +optional = false +python-versions = "*" +files = [ + {file = "pyserial-3.5-py2.py3-none-any.whl", hash = "sha256:c4451db6ba391ca6ca299fb3ec7bae67a5c55dde170964c7a14ceefec02f2cf0"}, + {file = "pyserial-3.5.tar.gz", hash = "sha256:3c77e014170dfffbd816e6ffc205e9842efb10be9f58ec16d3e8675b4925cddb"}, +] + +[package.extras] +cp2110 = ["hidapi"] + [[package]] name = "python-dateutil" version = "2.9.0.post0" @@ -783,4 +808,4 @@ files = [ [metadata] lock-version = "2.0" python-versions = "^3.12" -content-hash = "5bc6618b484c0a855360421a70fb7e3621e1aae77d2cc39eec8a6cf240e104cb" +content-hash = "7d635745a85a7a118b0a5d87d3f6459e8fc42faa1f5b917d9b79f157040a82ce" diff --git a/pyproject.toml b/pyproject.toml index f8050be..fc3d1e7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -12,6 +12,8 @@ pyglet = "^2.0.17" numpy = "^2.1.1" #lapx = "^0.5.10.post1" simple-online-realtime-tracking = { path = "../sort/", develop = true } +pyserial = "^3.5" +easing-functions = "^1.0.4" [tool.poetry.group.dev.dependencies] setuptools = "^75.1.0" diff --git a/walk-animation/walking-body.blend b/walk-animation/walking-body.blend index f92776c..7173ee2 100644 --- a/walk-animation/walking-body.blend +++ b/walk-animation/walking-body.blend @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:9658215bbebfec3044eb8eb9fa428ec15244cc94a1a18d697f34f00e7510ce90 -size 70663196 +oid sha256:ed280620ea9e9a5f94a9fbdc0442052b69a6baec5c42107414b513a876efc92c +size 70766516 diff --git a/walk_bw.jpg b/walk_bw.jpg deleted file mode 100644 index 23446cb..0000000 Binary files a/walk_bw.jpg and /dev/null differ