507 lines
17 KiB
Python
507 lines
17 KiB
Python
from __future__ import annotations
|
|
import json
|
|
from os import X_OK, PathLike
|
|
import os
|
|
from typing import Optional, Union
|
|
import shelve
|
|
from pydub import AudioSegment
|
|
import svgwrite
|
|
import tempfile
|
|
import io
|
|
import logging
|
|
|
|
logger = logging.getLogger('svganim.strokes')
|
|
|
|
|
|
class Annotation:
|
|
def __init__(self, tag: str, drawing: Drawing, t_in: float, t_out: float) -> None:
|
|
self.tag = tag
|
|
self.t_in = t_in
|
|
self.t_out = t_out
|
|
self.drawing = drawing
|
|
|
|
@property
|
|
def id(self) -> str:
|
|
return f'{self.drawing.id}:{self.tag}:{self.t_in}:{self.t_out}'
|
|
|
|
def getAnimationSlice(self) -> AnimationSlice:
|
|
return self.drawing.get_animation().getSlice(self.t_in, self.t_out)
|
|
|
|
def get_as_svg(self) -> str:
|
|
return self.getAnimationSlice().get_as_svg()
|
|
|
|
|
|
Filename = Union[str, bytes, PathLike[str], PathLike[bytes]]
|
|
|
|
|
|
class Drawing:
|
|
def __init__(self, filename: Filename, metadata_dir: Filename, basedir: Filename) -> None:
|
|
self.eventfile = filename
|
|
self.id = os.path.splitext(os.path.basename(self.eventfile))[0]
|
|
self.metadata_fn = os.path.join(metadata_dir, f"{self.id}.json")
|
|
self.basedir = basedir
|
|
|
|
def get_url(self) -> str:
|
|
return f"/files/{self.id}"
|
|
|
|
def get_annotations_url(self) -> str:
|
|
return f"/annotations/{self.id}"
|
|
|
|
def get_canvas_metadata(self) -> list:
|
|
logger.info(f'metadata for {self.id}')
|
|
with open(self.eventfile, "r") as fp:
|
|
first_line = fp.readline().strip()
|
|
|
|
if first_line.endswith(","):
|
|
first_line = first_line[:-1]
|
|
data = json.loads(first_line)
|
|
return {
|
|
"date": data[0],
|
|
"dimensions": {
|
|
"width": data[1],
|
|
"height": data[2],
|
|
},
|
|
}
|
|
|
|
def get_audio(self) -> Optional[AudioSlice]:
|
|
md = self.get_metadata()
|
|
if 'audio' not in md:
|
|
return None
|
|
if 'file' not in md['audio']:
|
|
return None
|
|
|
|
return AudioSlice(filename=os.path.join(self.basedir, md['audio']['file'][1:]), offset=md['audio']['offset']*1000)
|
|
|
|
def get_animation(self) -> AnimationSlice:
|
|
# with open(self.eventfile, "r") as fp:
|
|
strokes = []
|
|
with open(self.eventfile, "r") as fp:
|
|
events = json.loads("[" + fp.read() + "]")
|
|
for i, event in enumerate(events):
|
|
if i == 0:
|
|
# metadata on first line
|
|
pass
|
|
else:
|
|
if type(event) is list:
|
|
# ignore double metadatas, which appear when continuaing an existing drawing
|
|
continue
|
|
|
|
if event["event"] == "viewbox":
|
|
pass
|
|
if event["event"] == "stroke":
|
|
# points = []
|
|
# for i in range(int(len(stroke) / 4)):
|
|
# p = stroke[i*4:i*4+4]
|
|
# points.append([float(p[0]), float(p[1]), int(p[2]), float(p[3])])
|
|
strokes.append(
|
|
Stroke(
|
|
event["color"],
|
|
[Point.fromTuple(tuple(p))
|
|
for p in event["points"]],
|
|
)
|
|
)
|
|
return AnimationSlice(strokes, audioslice=self.get_audio())
|
|
|
|
def get_metadata(self):
|
|
canvas = self.get_canvas_metadata()
|
|
if os.path.exists(self.metadata_fn):
|
|
with open(self.metadata_fn, "r") as fp:
|
|
metadata = json.load(fp)
|
|
else:
|
|
metadata = {}
|
|
metadata["canvas"] = canvas
|
|
return metadata
|
|
|
|
def get_absolute_viewbox(self) -> Viewbox:
|
|
return self.get_animation().get_bounding_box()
|
|
|
|
|
|
class Viewbox:
|
|
def __init__(self, x: float, y: float, width: float, height: float):
|
|
self.x = x
|
|
self.y = y
|
|
self.width = width
|
|
self.height = height
|
|
|
|
def __str__(self) -> str:
|
|
return f"{self.x} {self.y} {self.width} {self.height}"
|
|
|
|
|
|
FrameIndex = tuple[int, int]
|
|
|
|
|
|
class AnimationSlice:
|
|
# either a whole drawing or the result of applying an annotation to a drawing (an excerpt)
|
|
# TODO rename to AnimationSlice to include audio as well
|
|
def __init__(
|
|
self, strokes: list[Stroke], t_in: float = 0, t_out: float = None, audioslice: AudioSlice = None
|
|
) -> None:
|
|
self.strokes = strokes
|
|
self.t_in = t_in
|
|
self.t_out = t_out
|
|
self.audio = audioslice
|
|
# TODO: Audio
|
|
|
|
def get_bounding_box(self) -> Viewbox:
|
|
min_x, max_x = float("inf"), float("-inf")
|
|
min_y, max_y = float("inf"), float("-inf")
|
|
|
|
for s in self.strokes:
|
|
for p in s.points:
|
|
if p.x < min_x:
|
|
min_x = p.x
|
|
if p.x > max_x:
|
|
max_x = p.x
|
|
if p.y < min_y:
|
|
min_y = p.y
|
|
if p.y > max_y:
|
|
max_y = p.y
|
|
return Viewbox(min_x, min_y, max_x - min_x, max_y - min_y)
|
|
|
|
def getSlice(self, t_in: float, t_out: float) -> AnimationSlice:
|
|
frame_in = self.getIndexForInPoint(t_in)
|
|
frame_out = self.getIndexForOutPoint(t_out)
|
|
strokes = self.getStrokeSlices(frame_in, frame_out)
|
|
audio = self.audio.getSlice(t_in, t_out) if self.audio else None
|
|
return AnimationSlice(strokes, t_in, t_out, audio)
|
|
|
|
def get_as_svg_dwg(self) -> svgwrite.Drawing:
|
|
box = self.get_bounding_box()
|
|
(_, fn) = tempfile.mkstemp(suffix='.svg', text=True)
|
|
dwg = svgwrite.Drawing(fn, size=(box.width, box.height))
|
|
dwg.viewbox(box.x, box.y, box.width, box.height)
|
|
self.add_to_dwg(dwg)
|
|
dwg.defs.add(dwg.style("path{stroke-width:1mm;stroke-linecap: round;}"))
|
|
return dwg
|
|
|
|
def get_as_svg(self) -> str:
|
|
dwg = self.get_as_svg_dwg()
|
|
fp = io.StringIO()
|
|
dwg.write(fp, pretty=True)
|
|
return fp.getvalue()
|
|
|
|
def add_to_dwg(self, dwg: SvgDrawing):
|
|
group = svgwrite.container.Group()
|
|
for stroke in self.strokes:
|
|
stroke.add_to_dwg(group)
|
|
dwg.add(group)
|
|
|
|
def getStrokeSlices(
|
|
self, index_in: FrameIndex, index_out: FrameIndex
|
|
) -> list[Stroke]:
|
|
"""Get list of Stroke/StrokeSlice based in in and out indexes
|
|
Based on annotation.js getStrokesSliceForPathRange(in_point, out_point)
|
|
"""
|
|
slices = []
|
|
for i in range(index_in[0], index_out[0] + 1):
|
|
try:
|
|
stroke = self.strokes[i]
|
|
except IndexError:
|
|
# out point can be Infinity. So interrupt whenever the end is reached
|
|
break
|
|
|
|
in_i = index_in[1] if index_in[0] == i else 0
|
|
out_i = index_out[1] if index_out[0] == i else len(
|
|
stroke.points) - 1
|
|
|
|
slices.append(StrokeSlice(stroke, in_i, out_i))
|
|
return slices
|
|
|
|
def getIndexForInPoint(self, ms) -> FrameIndex:
|
|
"""Get the frame index (path, point) based on the given time
|
|
The In point version (so the first index after ms)
|
|
Equal to annotations.js findPositionForTime(ms)
|
|
"""
|
|
path_i = 0
|
|
point_i = 0
|
|
for i, stroke in enumerate(self.strokes):
|
|
start_at = stroke.points[0].t
|
|
end_at = stroke.points[-1].t
|
|
if end_at < ms:
|
|
# certainly not the right point yet
|
|
continue
|
|
if start_at > ms:
|
|
path_i = i
|
|
point_i = 0
|
|
break # too far, so this is the first point after in point
|
|
else:
|
|
# our in-point is inbetween first and last of the stroke
|
|
# we are getting close, find the right point_i
|
|
path_i = i
|
|
for pi, point in enumerate(stroke.points):
|
|
point_i = pi
|
|
if point.t > ms:
|
|
break # stop when finding the next point after in point
|
|
break # done :-)
|
|
return (path_i, point_i)
|
|
|
|
def getIndexForOutPoint(self, ms) -> FrameIndex:
|
|
"""Get the frame index (path, point) based on the given time
|
|
The Out point version (so the last index before ms)
|
|
Equal to annotations.js findPositionForTime(ms)
|
|
"""
|
|
return self.getIndexForTime( ms)
|
|
|
|
def getIndexForTime(self, ms) -> FrameIndex:
|
|
"""Get the frame index (path, point) based on the given time
|
|
Equal to annotations.js findPositionForTime(ms)
|
|
"""
|
|
path_i = 0
|
|
point_i = 0
|
|
for i, stroke in enumerate(self.strokes):
|
|
start_at = stroke.points[0].t
|
|
end_at = stroke.points[-1].t
|
|
|
|
if start_at > ms:
|
|
break # too far
|
|
if end_at > ms:
|
|
# we are getting close, find the right point_i
|
|
path_i = i
|
|
for pi, point in enumerate(stroke.points):
|
|
if point.t > ms:
|
|
break # too far
|
|
point_i = pi
|
|
break # done :-)
|
|
else:
|
|
# in case this is our last path, stroe this as
|
|
# best option thus far
|
|
path_i = i
|
|
point_i = len(stroke.points) - 1
|
|
return (path_i, point_i)
|
|
|
|
|
|
class AudioSlice:
|
|
def __init__(self, filename: Filename, t_in: float = None, t_out: float = None, offset: float = None):
|
|
self.filename = filename
|
|
self.t_in = t_in # in ms
|
|
self.t_out = t_out # in ms
|
|
self.offset = offset # in ms
|
|
|
|
def getSlice(self, t_in: float, t_out: float) -> AnimationSlice:
|
|
return AudioSlice(self.filename, t_in, t_out, self.offset)
|
|
|
|
def export(self, format="mp3"):
|
|
"""Returns file descriptor of tempfile"""
|
|
# Opening file and extracting segment
|
|
song = AudioSegment.from_file(self.filename)
|
|
start = self.t_in - self.offset
|
|
end = self.t_out - self.offset
|
|
|
|
if start < 0 and end < 0:
|
|
extract = AudioSegment.silent(
|
|
duration=end-start, frame_rate=song.frame_rate)
|
|
else:
|
|
if start < 0:
|
|
preroll = AudioSegment.silent(
|
|
duration=start * -1, frame_rate=song.frame_rate)
|
|
start = 0
|
|
else:
|
|
preroll = None
|
|
if end > len(song):
|
|
postroll = AudioSegment.silent(
|
|
duration=end - len(song), frame_rate=song.frame_rate)
|
|
end = len(song) - 1
|
|
else:
|
|
postroll = None
|
|
|
|
extract = song[start: end]
|
|
if preroll:
|
|
extract = preroll + extract
|
|
if postroll:
|
|
extract += postroll
|
|
|
|
# Saving
|
|
return extract.export(None, format=format)
|
|
|
|
|
|
class AnnotationIndex:
|
|
def __init__(
|
|
self, filename: Filename, drawing_dir: Filename, metadata_dir: Filename
|
|
) -> None:
|
|
self.filename = filename
|
|
self.drawing_dir = drawing_dir
|
|
self.metadata_dir = metadata_dir
|
|
|
|
# disable disk cache because of glitches shelve.open(filename, writeback=True)
|
|
self.shelve = {}
|
|
|
|
def refresh(self):
|
|
logger.info("refreshing")
|
|
# reset the index
|
|
for key in list(self.shelve.keys()):
|
|
print(key)
|
|
del self.shelve[key]
|
|
|
|
self.shelve["_drawings"] = {
|
|
d.id: d
|
|
for d in [
|
|
Drawing(fn, self.metadata_dir, self.drawing_dir) for fn in self.get_drawing_filenames()
|
|
]
|
|
}
|
|
self.shelve['_tags'] = {}
|
|
self.shelve['_annotations'] = {}
|
|
|
|
drawing: Drawing
|
|
for drawing in self.shelve['_drawings'].values():
|
|
meta = drawing.get_metadata()
|
|
if 'annotations' not in meta:
|
|
continue
|
|
for ann in meta['annotations']:
|
|
annotation = Annotation(
|
|
ann['tag'], drawing, ann['t_in'], ann['t_out'])
|
|
self.shelve['_annotations'][annotation.id] = annotation
|
|
if annotation.tag not in self.shelve['_tags']:
|
|
self.shelve['_tags'][annotation.tag] = [annotation]
|
|
else:
|
|
self.shelve['_tags'][annotation.tag].append(
|
|
annotation
|
|
)
|
|
|
|
@property
|
|
def drawings(self) -> dict[str, Drawing]:
|
|
return self.shelve["_drawings"]
|
|
|
|
@property
|
|
def tags(self) -> dict[str, list[Annotation]]:
|
|
return self.shelve["_tags"]
|
|
|
|
@property
|
|
def annotations(self) -> dict[str, Annotation]:
|
|
return self.shelve["_annotations"]
|
|
|
|
def get_annotations(self, tag) -> list[Annotation]:
|
|
if tag not in self.tags:
|
|
return []
|
|
return self.tags[tag]
|
|
|
|
def get_drawing_names(self) -> list[str]:
|
|
return [
|
|
name[:-16]
|
|
for name in os.listdir(self.drawing_dir)
|
|
if name.endswith("json_appendable") and os.stat(os.path.join(self.drawing_dir, name)).st_size > 0
|
|
]
|
|
|
|
def get_drawing_filenames(self) -> list[Filename]:
|
|
return [
|
|
os.path.join(self.drawing_dir, f"{name}.json_appendable")
|
|
for name in self.get_drawing_names()
|
|
]
|
|
|
|
def __del__(self):
|
|
self.shelve.close()
|
|
|
|
|
|
# Point = tuple[float, float, float]
|
|
|
|
|
|
class Point:
|
|
def __init__(self, x: float, y: float, last: bool, t: float):
|
|
self.x = float(x)
|
|
self.y = float(y) # if y == 0 it can still be integer.... odd python
|
|
self.last = last
|
|
self.t = t
|
|
|
|
@classmethod
|
|
def fromTuple(cls, p: tuple[float, float, int, float]):
|
|
return cls(p[0], p[1], bool(p[2]), p[3])
|
|
|
|
def scaledToFit(self, dimensions: dict[str, float]) -> Point:
|
|
# TODO: change so that it actually scales to FIT dimensions
|
|
return Point(self.x, self.y, self.last, self.t)
|
|
|
|
|
|
Points = list[Point]
|
|
SvgDrawing = Union[svgwrite.container.SVG, svgwrite.container.Group]
|
|
|
|
|
|
class Stroke:
|
|
def __init__(self, color: str, points: Points) -> None:
|
|
self.color = color
|
|
self.points = points
|
|
|
|
def add_to_dwg(self, dwg: SvgDrawing):
|
|
path = svgwrite.path.Path(d=self.get_as_d()).stroke(
|
|
self.color, 1).fill("none")
|
|
dwg.add(path)
|
|
|
|
def get_bounding_box(self) -> Viewbox:
|
|
min_x, max_x = float("inf"), float("-inf")
|
|
min_y, max_y = float("inf"), float("-inf")
|
|
|
|
for p in self.points:
|
|
if p.x < min_x:
|
|
min_x = p.x
|
|
if p.x > max_x:
|
|
max_x = p.x
|
|
if p.y < min_y:
|
|
min_y = p.y
|
|
if p.y > max_y:
|
|
max_y = p.y
|
|
return Viewbox(min_x, min_y, max_x - min_x, max_y - min_y)
|
|
|
|
def get_as_d(self):
|
|
d = ""
|
|
prev_point = None
|
|
cmd = ""
|
|
for point in self.points:
|
|
if not prev_point:
|
|
# TODO multiply points by scalars for dimensions (height widht of drawing)
|
|
d += f'M{point.x:.6},{point.y:.6} '
|
|
cmd = 'M'
|
|
else:
|
|
if prev_point.last:
|
|
d += " m"
|
|
cmd = "m"
|
|
elif cmd != 'l':
|
|
d += ' l '
|
|
cmd = 'l'
|
|
diff_point = {
|
|
"x": point.x - prev_point.x,
|
|
"y": point.y - prev_point.y,
|
|
}
|
|
# TODO multiply points by scalars for dimensions (height widht of drawing)
|
|
d += f'{diff_point["x"]:.6},{diff_point["y"]:.6} '
|
|
prev_point = point
|
|
return d
|
|
|
|
|
|
class StrokeSlice(Stroke):
|
|
def __init__(self, stroke: Stroke, i_in: int = None, i_out: int = None) -> None:
|
|
self.stroke = stroke
|
|
self.i_in = 0 if i_in is None else i_in
|
|
self.i_out = len(self.stroke.points) - 1 if i_out is None else i_out
|
|
|
|
def slice_id(self):
|
|
return f"{self.i_in}-{self.i_out}"
|
|
|
|
@property
|
|
def points(self) -> Points:
|
|
return self.stroke.points[self.i_in: self.i_out + 1]
|
|
|
|
@property
|
|
def color(self) -> str:
|
|
return self.stroke.color
|
|
|
|
|
|
def strokes2D(strokes):
|
|
# strokes to a d attribute for a path
|
|
d = ""
|
|
last_stroke = None
|
|
cmd = ""
|
|
for stroke in strokes:
|
|
if not last_stroke:
|
|
d += f"M{stroke[0]},{stroke[1]} "
|
|
cmd = 'M'
|
|
else:
|
|
if last_stroke[2] == 1:
|
|
d += " m"
|
|
cmd = 'm'
|
|
elif cmd != 'l':
|
|
d += ' l '
|
|
cmd = 'l'
|
|
|
|
rel_stroke = [stroke[0] - last_stroke[0],
|
|
stroke[1] - last_stroke[1]]
|
|
d += f"{rel_stroke[0]},{rel_stroke[1]} "
|
|
last_stroke = stroke
|
|
return d
|