WIP python side of annotation extraction and animation

This commit is contained in:
Ruben van de Ven 2021-12-24 14:06:55 +01:00
parent 16d8259866
commit 5f626b5b3c
3 changed files with 366 additions and 0 deletions

View file

@ -8,6 +8,8 @@ authors = ["Ruben van de Ven <git@rubenvandeven.com>"]
python = "^3.9"
tornado = "^6.1"
coloredlogs = "^15.0.1"
pydub = "^0.25.1"
svgwrite = "^1.4.1"
[tool.poetry.dev-dependencies]

0
svganim/__init__.py Normal file
View file

364
svganim/strokes.py Normal file
View file

@ -0,0 +1,364 @@
from __future__ import annotations
import json
from os import X_OK, PathLike
import os
from webserver import strokes2D
from typing import Optional, Union
import shelve
from pydub import AudioSegment
import svgwrite
class Annotation:
def __init__(self, tag: str, drawing: Drawing, t_in: float, t_out: float) -> None:
self.tag = tag
self.t_in = t_in
self.t_out = t_out
self.drawing = drawing
def getAnimationSlice(self) -> AnimationSlice:
return self.drawing.get_animation().getSlice(self.t_in, self.t_out)
def get_as_svg(self):
anim = self.getAnimationSlice()
bb = anim.get_bounding_box()
dwg = svgwrite.Drawing('/tmp/test.svg', size=(bb.width, bb.height))
anim.add_to_dwg(dwg)
dwg.save()
# TODO .... wip
print('saved!')
Filename = Union[str, bytes, PathLike[str], PathLike[bytes]]
class Drawing:
def __init__(self, filename: Filename, metadata_dir: Filename) -> None:
self.eventfile = filename
self.id = os.path.splitext(os.path.basename(self.eventfile))[0]
self.metadata_fn = os.path.join(metadata_dir, f"{self.id}.json")
def get_url(self) -> str:
return f"/files/{self.id}"
def get_annotations_url(self) -> str:
return f"/annotations/{self.id}"
def get_canvas_metadata(self) -> list:
with open(self.eventfile, "r") as fp:
first_line = fp.readline().strip()
if first_line.endswith(","):
first_line = first_line[:-1]
data = json.loads(first_line)
return {
"date": data[0],
"dimensions": {
"width": data[1],
"height": data[2],
},
}
def get_animation(self) -> AnimationSlice:
# with open(self.eventfile, "r") as fp:
strokes = []
with open(self.eventfile, "r") as fp:
events = json.loads("[" + fp.read() + "]")
for i, event in enumerate(events):
if i == 0:
# metadata on first line
pass
else:
if event["event"] == "viewbox":
pass
if event["event"] == "stroke":
# points = []
# for i in range(int(len(stroke) / 4)):
# p = stroke[i*4:i*4+4]
# points.append([float(p[0]), float(p[1]), int(p[2]), float(p[3])])
strokes.append(
Stroke(
event["color"],
[Point.fromTuple(tuple(p)) for p in event["points"]],
)
)
return AnimationSlice(strokes)
def get_metadata(self):
canvas = self.get_canvas_metadata()
if os.path.exists(self.metadata_fn):
with open(self.metadata_fn, "r") as fp:
metadata = json.load(fp)
else:
metadata = {}
metadata["canvas"] = canvas
return metadata
def get_absolute_viewbox(self) -> Viewbox:
return self.get_animation().get_bounding_box()
class Viewbox:
def __init__(self, x: float, y: float, width: float, height: float):
self.x = x
self.y = y
self.width = width
self.height = height
def __str__(self) -> str:
return f"{self.x} {self.y} {self.width} {self.height}"
FrameIndex = tuple[int, int]
class AnimationSlice:
# either a whole drawing or the result of applying an annotation to a drawing (an excerpt)
# TODO rename to AnimationSlice to include audio as well
def __init__(
self, strokes: list[Stroke], t_in: float = 0, t_out: float = None
) -> None:
self.strokes = strokes
self.t_in = t_in
self.t_out = t_out
# TODO: Audio
def get_bounding_box(self) -> Viewbox:
min_x, max_x = float("inf"), float("-inf")
min_y, max_y = float("inf"), float("-inf")
for s in self.strokes:
for p in s.points:
if p.x < min_x:
min_x = p.x
if p.x > max_x:
max_x = p.x
if p.y < min_y:
min_y = p.y
if p.y > max_y:
max_y = p.y
return Viewbox(min_x, min_y, max_x - min_x, max_y - min_y)
def getSlice(self, t_in: float, t_out: float) -> AnimationSlice:
frame_in = self.getIndexForTime(t_in)
frame_out = self.getIndexForTime(t_out)
strokes = self.getStrokeSlices(frame_in, frame_out)
return AnimationSlice(strokes, t_in, t_out)
def add_to_dwg(self, dwg: SvgDrawing):
group = svgwrite.container.Group()
for stroke in self.strokes:
stroke.add_to_dwg(group)
def getStrokeSlices(
self, index_in: FrameIndex, index_out: FrameIndex
) -> list[Stroke]:
"""Get list of Stroke/StrokeSlice based in in and out indexes
Based on annotation.js getStrokesSliceForPathRange(in_point, out_point)
"""
slices = []
for i in range(index_in[0], index_out[0] + 1):
try:
stroke = self.strokes[i]
except IndexError:
# out point can be Infinity. So interrupt whenever the end is reached
break
in_i = index_in[1] if index_in[0] == i else 0
out_i = index_out[1] if index_out[0] == i else len(stroke.points) - 1
slices.append(StrokeSlice(stroke, in_i, out_i))
return slices
def getIndexForTime(self, ms) -> FrameIndex:
"""Get the frame index (path, point) based on the given time
Equal to annotations.js findPositionForTime(ms)
"""
path_i = 0
point_i = 0
for i, stroke in enumerate(self.strokes):
start_at = stroke.points[0].t
end_at = stroke.points[-1].t
if start_at > ms:
break # too far
if end_at > ms:
# we are getting close, find the right point_i
path_i = i
for pi, point in enumerate(stroke.points):
if point.t:
break # too far
point_i = pi
break # done :-)
else:
# in case this is our last path, stroe this as
# best option thus far
path_i = i
point_i = len(stroke.points) - 1
return (path_i, point_i)
class AudioSlice:
def __init__(self, filename: Filename, t_in: float, t_out: float):
self.filename = filename
self.t_in = t_in # in ms
self.t_out = t_out # in ms
def export(self, format="mp3"):
"""Returns file descriptor of tempfile"""
# Opening file and extracting segment
song = AudioSegment.from_file(self.filename)
extract = song[self.t_in : self.t_out]
# Saving
return extract.export(None, format=format)
class AnnotationIndex:
def __init__(
self, filename: Filename, drawing_dir: Filename, metadata_dir: Filename
) -> None:
self.filename = filename
self.drawing_dir = drawing_dir
self.metadata_dir = metadata_dir
self.index = shelve.open(filename)
def initiate(self):
# reset the index
for key in self.index:
del self.index[key]
self.index["_drawings"] = {
d.id: d
for d in [
Drawing(fn, self.metadata_dir) for fn in self.get_drawing_filenames()
]
}
drawing: Drawing
for drawing in self.index['_drawings'].values():
meta = drawing.get_metadata()
if 'annotations' not in meta:
continue
for ann in meta['annotations']:
annotation = Annotation(ann['tag'], drawing, ann['t_in'], ann['t_out'])
if annotation.tag not in self.index:
self.index[annotation.tag] = [annotation]
else:
self.index[annotation.tag].append(
annotation
)
@property
def drawings(self) -> dict[str, Drawing]:
return self.index["_drawings"]
def get(self, tag) -> list[Annotation]:
if tag not in self.index:
return []
return self.index[tag]
def get_drawing_names(self) -> list[str]:
return [
name[:-16]
for name in os.listdir(self.drawing_dir)
if name.endswith("json_appendable")
]
def get_drawing_filenames(self) -> list[Filename]:
return [
os.path.join(self.drawing_dir, f"{name}.json_appendable")
for name in self.get_drawing_names()
]
def __del__(self):
self.index.close()
# Point = tuple[float, float, float]
class Point:
def __init__(self, x: float, y: float, last: bool, t: float):
self.x = x
self.y = y
self.last = last
self.t = t
@classmethod
def fromTuple(cls, p: tuple[float, float, int, float]):
return cls(p[0], p[1], bool(p[2]), p[3])
Points = list[Point]
SvgDrawing = Union[svgwrite.container.SVG, svgwrite.container.Group]
class Stroke:
def __init__(self, color: str, points: Points) -> None:
self.color = color
self.points = points
def add_to_dwg(self, dwg: SvgDrawing):
path = svgwrite.path.Path(self.get_as_d).stroke(self.color,1).fill("none")
dwg.add(path)
def get_bounding_box(self) -> Viewbox:
min_x, max_x = float("inf"), float("-inf")
min_y, max_y = float("inf"), float("-inf")
for p in self.points:
if p.x < min_x:
min_x = p.x
if p.x > max_x:
max_x = p.x
if p.y < min_y:
min_y = p.y
if p.y > max_y:
max_y = p.y
return Viewbox(min_x, min_y, max_x - min_x, max_y - min_y)
def get_as_d(self):
d = ""
prev_point = None
cmd = ""
for point in self.points:
if not prev_point:
# TODO multiply points by scalars for dimensions (height widht of drawing)
d += f'M{point.x:.6},{point.y:.6} '
cmd = 'M'
else:
if prev_point.last:
d += " m"
cmd = "m"
elif cmd != 'l':
d += ' l '
cmd = 'l'
diff_point = {
"x": point.x - prev_point.x,
"y": point.y - prev_point.y,
}
# TODO multiply points by scalars for dimensions (height widht of drawing)
d += f'{diff_point.x:.6},{diff_point.y:.6} '
prev_point = point
return d
class StrokeSlice(Stroke):
def __init__(self, stroke: Stroke, i_in: int = None, i_out: int = None) -> None:
self.stroke = stroke
self.i_in = 0 if i_in is None else i_in
self.i_out = len(self.stroke.points) - 1 if i_out is None else i_out
def slice_id(self):
return f"{self.i_in}-{self.i_out}"
@property
def points(self) -> Points:
return self.stroke.points[self.i_in : self.i_out + 1]
@property
def color(self) -> str:
return self.stroke.color