
727 lines
25 KiB
Raw Normal View History

from __future__ import annotations
import asyncio
import copy
import json
from os import X_OK, PathLike
import os
import subprocess
from typing import Optional, Union
import shelve
from pydub import AudioSegment
import svgwrite
2022-01-19 09:28:51 +00:00
import tempfile
import io
import logging
2022-06-01 14:08:49 +00:00
from anytree import NodeMixin, RenderTree
from anytree.exporter import JsonExporter
from anytree.importer import JsonImporter, DictImporter
logger = logging.getLogger('svganim.strokes')
Milliseconds = float
Seconds = float
class Annotation:
def __init__(self, tag: str, drawing: Drawing, t_in: Milliseconds, t_out: Milliseconds) -> None:
self.tag = tag
self.t_in = t_in
self.t_out = t_out
self.drawing = drawing
2022-01-19 09:28:51 +00:00
def id(self) -> str:
return f'{}:{self.tag}:{self.t_in}:{self.t_out}'
def getAnimationSlice(self) -> AnimationSlice:
return self.drawing.get_animation().getSlice(self.t_in, self.t_out)
2022-01-19 09:28:51 +00:00
def get_as_svg(self) -> str:
return self.getAnimationSlice().get_as_svg()
def getJsonUrl(self) -> str:
return self.drawing.get_url() + f"?t_in={self.t_in}&t_out={self.t_out}"
Filename = Union[str, bytes, PathLike[str], PathLike[bytes]]
SliceId = [str, float, float]
class Drawing:
2022-01-19 09:28:51 +00:00
def __init__(self, filename: Filename, metadata_dir: Filename, basedir: Filename) -> None:
self.eventfile = filename = os.path.splitext(os.path.basename(self.eventfile))[0]
self.metadata_fn = os.path.join(metadata_dir, f"{}.json")
2022-01-19 09:28:51 +00:00
self.basedir = basedir
def get_url(self) -> str:
return f"/files/{}"
def get_annotations_url(self) -> str:
return f"/annotations/{}"
def get_canvas_metadata(self) -> list:'metadata for {}')
with open(self.eventfile, "r") as fp:
first_line = fp.readline().strip()
if first_line.endswith(","):
first_line = first_line[:-1]
data = json.loads(first_line)
return {
"date": data[0],
"dimensions": {
"width": data[1],
"height": data[2],
2022-01-19 09:28:51 +00:00
def get_audio(self) -> Optional[AudioSlice]:
md = self.get_metadata()
if 'audio' not in md:
return None
if 'file' not in md['audio']:
return None
return AudioSlice(filename=os.path.join(self.basedir, md['audio']['file'][1:]), drawing=self, offset=md['audio']['offset']*1000)
2022-01-19 09:28:51 +00:00
def get_animation(self) -> AnimationSlice:
# with open(self.eventfile, "r") as fp:
strokes = []
viewboxes = []
with open(self.eventfile, "r") as fp:
events = json.loads("[" + + "]")
for i, event in enumerate(events):
if i == 0:
# metadata on first line, add as initial viewbox to slice
viewboxes.append(TimedViewbox(-float('Infinity'), 0, 0, event[1], event[2]))
if type(event) is list:
# ignore double metadatas, which appear when continuaing an existing drawing
if event["event"] == "viewbox":
b['t'], b['x'], b['y'], b['width'], b['height']) for b in event['viewboxes']])
if event["event"] == "stroke":
# points = []
# for i in range(int(len(stroke) / 4)):
# p = stroke[i*4:i*4+4]
# points.append([float(p[0]), float(p[1]), int(p[2]), float(p[3])])
for p in event["points"]],
return AnimationSlice([, None, None], strokes, viewboxes, audioslice=self.get_audio())
def get_metadata(self):
canvas = self.get_canvas_metadata()
if os.path.exists(self.metadata_fn):
with open(self.metadata_fn, "r") as fp:
metadata = json.load(fp)
metadata = {}
metadata["canvas"] = canvas
return metadata
def get_absolute_viewbox(self) -> Viewbox:
return self.get_animation().get_bounding_box()
class Viewbox:
def __init__(self, x: float, y: float, width: float, height: float):
self.x = x
self.y = y
self.width = width
self.height = height
def __str__(self) -> str:
return f"{self.x} {self.y} {self.width} {self.height}"
class TimedViewbox(Viewbox):
def __init__(self, time: Milliseconds, x: float, y: float, width: float, height: float):
super().__init__(x, y, width, height)
self.t = time
FrameIndex = tuple[int, int]
class AnimationSlice:
# either a whole drawing or the result of applying an annotation to a drawing (an excerpt)
# TODO rename to AnimationSlice to include audio as well
def __init__(
self, slice_id: SliceId, strokes: list[Stroke], viewboxes: list[TimedViewbox] = [], t_in: float = 0, t_out: float = None, audioslice: AudioSlice = None
) -> None: = slice_id
self.strokes = strokes
self.viewboxes = viewboxes
self.t_in = t_in
self.t_out = t_out
2022-01-19 09:28:51 +00:00 = audioslice
# TODO: Audio
def asDict(self) -> dict:
"""Can be used to json-ify the animation-slice
# conversion necessary for when no t_in is given
boxes = [v.__dict__ for v in self.viewboxes]
for box in boxes:
if box['t'] == -float('Infinity'):
box['t'] = 0
drawing = {
"file": self.getUrl(),
"time": "-", # creation date
# dimensions of drawing canvas
"dimensions": [self.viewboxes[0].width, self.viewboxes[0].height],
"shape": [s.asDict() for s in self.strokes],
"viewboxes": boxes,
"bounding_box": self.get_bounding_box().__dict__,
"audio": self.getAudioDict() if else None
return drawing
def getAudioDict(self):
"""quick and dirty to not use audio.asDict(), but it avoids passing all around sorts of data"""
return {
"file": '/files/' + self.getUrl('.mp3'),
"offset": 0
# "offset": / 1000
def getUrl(self, extension = '') -> str:
if not[1] and not[2]:
return[0] + f"{extension}?t_in={self.t_in}&t_out={self.t_out}"
def get_bounding_box(self, stroke_thickness: float = 3.5) -> Viewbox:
"""Stroke_thickness 3.5 == 1mm. If it should not be considered, just set it to 0.
if len(self.strokes) == 0:
# empty set
return Viewbox(0,0,0,0)
min_x, max_x = float("inf"), float("-inf")
min_y, max_y = float("inf"), float("-inf")
for s in self.strokes:
for p in s.points:
x1 = p.x - stroke_thickness/2
x2 = p.x + stroke_thickness/2
y1 = p.y - stroke_thickness/2
y2 = p.y + stroke_thickness/2
if x1 < min_x:
min_x = x1
if x2 > max_x:
max_x = x2
if y1 < min_y:
min_y = y1
if y2 > max_y:
max_y = y2
return Viewbox(min_x, min_y, max_x - min_x, max_y - min_y)
def getSlice(self, t_in: Milliseconds, t_out: Milliseconds) -> AnimationSlice:
"""slice the slice. T in ms"""
frame_in = self.getIndexForInPoint(t_in)
frame_out = self.getIndexForOutPoint(t_out)
strokes = self.getStrokeSlices(frame_in, frame_out, t_in)
# TODO shift t of points with t_in
viewboxes = self.getViewboxesSlice(t_in, t_out)
2022-06-01 14:08:49 +00:00
2022-01-19 09:28:51 +00:00
audio =, t_out) if else None
return AnimationSlice([[0], t_in, t_out], strokes, viewboxes, t_in, t_out, audio)
2022-01-19 09:28:51 +00:00
def get_as_svg_dwg(self) -> svgwrite.Drawing:
box = self.get_bounding_box()
(_, fn) = tempfile.mkstemp(suffix='.svg', text=True)
dwg = svgwrite.Drawing(fn, size=(box.width, box.height))
dwg.viewbox(box.x, box.y, box.width, box.height)
dwg.defs.add("path{stroke-width:1mm;stroke-linecap: round;}"))
2022-01-19 09:28:51 +00:00
return dwg
def get_as_svg(self) -> str:
dwg = self.get_as_svg_dwg()
fp = io.StringIO()
dwg.write(fp, pretty=True)
return fp.getvalue()
def add_to_dwg(self, dwg: SvgDrawing):
group = svgwrite.container.Group()
for stroke in self.strokes:
2022-01-19 09:28:51 +00:00
def getViewboxesSlice(self, t_in: Milliseconds, t_out: Milliseconds) -> list[TimedViewbox]:
"""Extract the viewboxes for in- and outpoints.
If there's one before inpoint, move that to the t_in, so that animation starts at the right position
the slice is offset by t_in ms
viewboxes = [] # Add single empty element, so that we can use viewboxes[0] later
lastbox = None
for viewbox in self.viewboxes:
if viewbox.t > t_out:
if viewbox.t <= t_in:
# make sure the first box is the last box from _before_ the slice
firstbox = TimedViewbox(
0, viewbox.x, viewbox.y, viewbox.width, viewbox.height)
if not len(viewboxes):
viewboxes[0] = firstbox
viewboxes.append(TimedViewbox(viewbox.t-t_in, viewbox.x, viewbox.y, viewbox.width, viewbox.height))
return viewboxes
def getStrokeSlices(
self, index_in: FrameIndex, index_out: FrameIndex, t_offset: Seconds = 0
) -> list[Stroke]:
"""Get list of Stroke/StrokeSlice based in in and out indexes
Based on annotation.js getStrokesSliceForPathRange(in_point, out_point)
2022-05-31 11:15:22 +00:00
If either in point or out point is [None, None], return an empty set.
slices = []
2022-05-31 11:15:22 +00:00
if index_in[0] is None and index_in[1] is None:
# If no inpoint is set, in_point is after the last stroke
return slices
if index_out[0] is None and index_out[1] is None:
# If no out point is set, out_point is before the last stroke
return slices
for i in range(index_in[0], index_out[0] + 1):
stroke = self.strokes[i]
except IndexError:
# out point can be Infinity. So interrupt whenever the end is reached
in_i = index_in[1] if index_in[0] == i else 0
out_i = index_out[1] if index_out[0] == i else len(
stroke.points) - 1
slices.append(StrokeSlice(stroke, in_i, out_i, t_offset))
return slices
def getIndexForInPoint(self, ms: Milliseconds) -> FrameIndex:
"""Get the frame index (path, point) based on the given time
The In point version (so the first index after ms)
Equal to annotations.js findPositionForTime(ms)
2022-05-31 11:15:22 +00:00
path_i = None
point_i = None
for i, stroke in enumerate(self.strokes):
start_at = stroke.points[0].t
end_at = stroke.points[-1].t
if end_at < ms:
# certainly not the right point yet
if start_at > ms:
path_i = i
point_i = 0
break # too far, so this is the first point after in point
# our in-point is inbetween first and last of the stroke
# we are getting close, find the right point_i
path_i = i
for pi, point in enumerate(stroke.points):
point_i = pi
if point.t > ms:
break # stop when finding the next point after in point
break # done :-)
2022-05-31 11:15:22 +00:00
if path_i is None or point_i is None:
logger.warn("in point after last stroke. Not sure if this works")
return (path_i, point_i)
def getIndexForOutPoint(self, ms: Milliseconds) -> FrameIndex:
"""Get the frame index (path, point) based on the given time
The Out point version (so the last index before ms)
Equal to annotations.js findPositionForTime(ms)
return self.getIndexForTime(ms)
def getIndexForTime(self, ms: Milliseconds) -> FrameIndex:
"""Get the frame index (path, point) based on the given time
Equal to annotations.js findPositionForTime(ms)
2022-05-31 11:15:22 +00:00
path_i = None
point_i = None
for i, stroke in enumerate(self.strokes):
start_at = stroke.points[0].t
end_at = stroke.points[-1].t
if start_at > ms:
break # too far
if end_at > ms:
# we are getting close, find the right point_i
path_i = i
for pi, point in enumerate(stroke.points):
if point.t > ms:
break # too far
point_i = pi
break # done :-)
# in case this is our last path, stroe this as
# best option thus far
path_i = i
point_i = len(stroke.points) - 1
2022-05-31 11:15:22 +00:00
if path_i is None or point_i is None:
logger.warn("OUT point after last stroke. Not sure if this works")
return (path_i, point_i)
audiocache = {}
class AudioSlice:
def __init__(self, filename: Filename, drawing: Drawing, t_in: Milliseconds = None, t_out: Milliseconds = None, offset: Milliseconds = None):
self.filename = filename
self.drawing = drawing
self.t_in = t_in # in ms
self.t_out = t_out # in ms
self.offset = offset # in ms TODO: use from self.drawing metadata
2022-01-19 09:28:51 +00:00
def getSlice(self, t_in: float, t_out: float) -> AnimationSlice:
return AudioSlice(self.filename, self.drawing, t_in, t_out, self.offset)
def asDict(self):
return {
"file": self.getUrl(),
# "offset": self.offset/1000
def getUrl(self):
fn = self.filename.replace("../files/audio", "/file/")
params = []
if self.t_in:
if self.t_out:
if len(params):
fn += "?" + "&".join(params)
return fn
async def export(self, format="mp3"):
"""Returns file descriptor of tempfile"""
# Opening file and extracting segment
start = int(self.t_in - self.offset) # millisecond precision is enough
end = int(self.t_out - self.offset) # millisecond precision is enough
# call ffmpeg directly, with given in and outpoint, so no unnecessary data is loaded, and no double conversion (e.g. ogg -> wav -> ogg ) is performed
out_f = io.BytesIO()
# build converter command to export
conversion_command = [
'-ss', f"{start}ms",
'-to', f"{end}ms",
"-i", self.filename, # ss before input, so not whole file is loaded
"-f", format, '-', # to stdout
# read stdin / write stdout"ffmpeg start")
proc = await asyncio.create_subprocess_exec(
p_out, p_err = await proc.communicate()"ffmpeg finished")
if proc.returncode != 0:
raise Exception(
"Encoding failed. ffmpeg/avlib returned error code: {0}\n\nCommand:{1}".format(
p.returncode, conversion_command))
return out_f
# old way, use AudioSegment, easy but slow (reads whole ogg to wav, then export segment to ogg again)
#"loading audio")
# if self.filename in audiocache:
# song = audiocache[self.filename]
# else:
# song = AudioSegment.from_file(self.filename)
# audiocache[self.filename] = song
#"loaded audio")
# if start < 0 and end < 0:
# extract = AudioSegment.silent(
# duration=end-start, frame_rate=song.frame_rate)
# else:
# if start < 0:
# preroll = AudioSegment.silent(
# duration=start * -1, frame_rate=song.frame_rate)
# start = 0
# else:
# preroll = None
# if end > len(song):
# postroll = AudioSegment.silent(
# duration=end - len(song), frame_rate=song.frame_rate)
# end = len(song) - 1
# else:
# postroll = None
# extract = song[start: end]
# if preroll:
# extract = preroll + extract
# if postroll:
# extract += postroll
# # Saving
# return extract.export(None, format=format)
class AnnotationIndex:
def __init__(
self, filename: Filename, drawing_dir: Filename, metadata_dir: Filename
) -> None:
self.filename = filename
self.drawing_dir = drawing_dir
self.metadata_dir = metadata_dir
# disable disk cache because of glitches, writeback=True)
self.shelve = {}
2022-01-19 09:28:51 +00:00
def refresh(self):"refreshing")
# reset the index
for key in list(self.shelve.keys()):
2022-01-19 09:28:51 +00:00
del self.shelve[key]
2022-01-19 09:28:51 +00:00
self.shelve["_drawings"] = { d
for d in [
2022-01-19 09:28:51 +00:00
Drawing(fn, self.metadata_dir, self.drawing_dir) for fn in self.get_drawing_filenames()
2022-01-19 09:28:51 +00:00
self.shelve['_tags'] = {}
self.shelve['_annotations'] = {}
drawing: Drawing
2022-01-19 09:28:51 +00:00
for drawing in self.shelve['_drawings'].values():
meta = drawing.get_metadata()
if 'annotations' not in meta:
for ann in meta['annotations']:
annotation = Annotation(
ann['tag'], drawing, ann['t_in'], ann['t_out'])
2022-01-19 09:28:51 +00:00
self.shelve['_annotations'][] = annotation
if annotation.tag not in self.shelve['_tags']:
self.shelve['_tags'][annotation.tag] = [annotation]
2022-01-19 09:28:51 +00:00
def drawings(self) -> dict[str, Drawing]:
return self.shelve["_drawings"]
2022-01-19 09:28:51 +00:00
def tags(self) -> dict[str, list[Annotation]]:
return self.shelve["_tags"]
def annotations(self) -> dict[str, Annotation]:
return self.shelve["_annotations"]
2022-01-19 09:28:51 +00:00
def get_annotations(self, tag) -> list[Annotation]:
if tag not in self.tags:
return []
2022-01-19 09:28:51 +00:00
return self.tags[tag]
def get_drawing_names(self) -> list[str]:
return [
for name in os.listdir(self.drawing_dir)
if name.endswith("json_appendable") and os.stat(os.path.join(self.drawing_dir, name)).st_size > 0
def get_drawing_filenames(self) -> list[Filename]:
return [
os.path.join(self.drawing_dir, f"{name}.json_appendable")
for name in self.get_drawing_names()
def __del__(self):
2022-01-19 09:28:51 +00:00
# Point = tuple[float, float, float]
class Point:
def __init__(self, x: float, y: float, last: bool, t: Seconds):
self.x = float(x)
self.y = float(y) # if y == 0 it can still be integer.... odd python
self.last = last
self.t = t
def fromTuple(cls, p: tuple[float, float, int, float]):
return cls(p[0], p[1], bool(p[2]), p[3])
2022-02-09 07:18:28 +00:00
def scaledToFit(self, dimensions: dict[str, float]) -> Point:
# TODO: change so that it actually scales to FIT dimensions
return Point(self.x, self.y, self.last, self.t)
def asList(self) -> list:
return [self.x, self.y, 1 if self.last else 0, self.t]
Points = list[Point]
SvgDrawing = Union[svgwrite.container.SVG, svgwrite.container.Group]
class Stroke:
def __init__(self, color: str, points: Points) -> None:
self.color = color
self.points = points
def asDict(self) -> dict:
return {"color": self.color, "points": [p.asList() for p in self.points]}
def add_to_dwg(self, dwg: SvgDrawing):
path = svgwrite.path.Path(d=self.get_as_d()).stroke(
self.color, 1).fill("none")
# def get_bounding_box(self) -> Viewbox:
# min_x, max_x = float("inf"), float("-inf")
# min_y, max_y = float("inf"), float("-inf")
# for p in self.points:
# if p.x < min_x:
# min_x = p.x
# if p.x > max_x:
# max_x = p.x
# if p.y < min_y:
# min_y = p.y
# if p.y > max_y:
# max_y = p.y
# return Viewbox(min_x, min_y, max_x - min_x, max_y - min_y)
def get_as_d(self):
d = ""
prev_point = None
cmd = ""
for point in self.points:
if not prev_point:
# TODO multiply points by scalars for dimensions (height widht of drawing)
d += f'M{point.x:.6},{point.y:.6} '
cmd = 'M'
if prev_point.last:
d += " m"
cmd = "m"
elif cmd != 'l':
d += ' l '
cmd = 'l'
diff_point = {
"x": point.x - prev_point.x,
"y": point.y - prev_point.y,
# TODO multiply points by scalars for dimensions (height widht of drawing)
2022-01-19 09:28:51 +00:00
d += f'{diff_point["x"]:.6},{diff_point["y"]:.6} '
prev_point = point
return d
class StrokeSlice(Stroke):
def __init__(self, stroke: Stroke, i_in: int = None, i_out: int = None, t_offset: Seconds = 0) -> None:
self.stroke = stroke
self.i_in = 0 if i_in is None else i_in
self.i_out = len(self.stroke.points) - 1 if i_out is None else i_out
# deepcopy points, because slices can be offset in time
self.points = copy.deepcopy(self.stroke.points[self.i_in: self.i_out + 1])
for p in self.points:
p.t -= t_offset
def slice_id(self):
return f"{self.i_in}-{self.i_out}"
# @property
# def points(self) -> Points:
# return self.stroke.points[self.i_in: self.i_out + 1]
def color(self) -> str:
return self.stroke.color
2022-01-19 09:28:51 +00:00
def strokes2D(strokes):
# strokes to a d attribute for a path
d = ""
last_stroke = None
cmd = ""
2022-01-19 09:28:51 +00:00
for stroke in strokes:
if not last_stroke:
d += f"M{stroke[0]},{stroke[1]} "
cmd = 'M'
if last_stroke[2] == 1:
2022-01-19 09:28:51 +00:00
d += " m"
cmd = 'm'
elif cmd != 'l':
d += ' l '
2022-01-19 09:28:51 +00:00
cmd = 'l'
rel_stroke = [stroke[0] - last_stroke[0],
stroke[1] - last_stroke[1]]
2022-01-19 09:28:51 +00:00
d += f"{rel_stroke[0]},{rel_stroke[1]} "
last_stroke = stroke
return d
2022-06-01 14:08:49 +00:00
class Tag(NodeMixin):
def __init__(self, id, name = None, description = "", color = "black", parent=None, children=None): = id = if name is None else name
self.color = color
self.description = description
self.parent = parent
if children:
self.children = children
class TagTree(NodeMixin):
def __init__(self, parent=None, children=None):
self.parent = parent
if children:
self.children = children
my0 = Tag('root')
my1 = Tag('my1', 1, 0, parent=my0)
my2 = Tag('my2', 0, 2, parent=my0)
tree = JsonExporter(indent=2).export(my0)
with open('www/tags.json', 'r') as fp: