chronodiagram/app/svganim/strokes.py

679 lines
23 KiB
Python

from __future__ import annotations
import asyncio
import copy
import json
from os import X_OK, PathLike
import os
import subprocess
from typing import Optional, Union
import shelve
from pydub import AudioSegment
import svgwrite
import tempfile
import io
import logging
logger = logging.getLogger('svganim.strokes')
Milliseconds = float
Seconds = float
class Annotation:
def __init__(self, tag: str, drawing: Drawing, t_in: Milliseconds, t_out: Milliseconds) -> None:
self.tag = tag
self.t_in = t_in
self.t_out = t_out
self.drawing = drawing
@property
def id(self) -> str:
return f'{self.drawing.id}:{self.tag}:{self.t_in}:{self.t_out}'
def getAnimationSlice(self) -> AnimationSlice:
return self.drawing.get_animation().getSlice(self.t_in, self.t_out)
def get_as_svg(self) -> str:
return self.getAnimationSlice().get_as_svg()
def getJsonUrl(self) -> str:
return self.drawing.get_url() + f"?t_in={self.t_in}&t_out={self.t_out}"
Filename = Union[str, bytes, PathLike[str], PathLike[bytes]]
SliceId = [str, float, float]
class Drawing:
def __init__(self, filename: Filename, metadata_dir: Filename, basedir: Filename) -> None:
self.eventfile = filename
self.id = os.path.splitext(os.path.basename(self.eventfile))[0]
self.metadata_fn = os.path.join(metadata_dir, f"{self.id}.json")
self.basedir = basedir
def get_url(self) -> str:
return f"/files/{self.id}"
def get_annotations_url(self) -> str:
return f"/annotations/{self.id}"
def get_canvas_metadata(self) -> list:
logger.info(f'metadata for {self.id}')
with open(self.eventfile, "r") as fp:
first_line = fp.readline().strip()
if first_line.endswith(","):
first_line = first_line[:-1]
data = json.loads(first_line)
return {
"date": data[0],
"dimensions": {
"width": data[1],
"height": data[2],
},
}
def get_audio(self) -> Optional[AudioSlice]:
md = self.get_metadata()
if 'audio' not in md:
return None
if 'file' not in md['audio']:
return None
return AudioSlice(filename=os.path.join(self.basedir, md['audio']['file'][1:]), drawing=self, offset=md['audio']['offset']*1000)
def get_animation(self) -> AnimationSlice:
# with open(self.eventfile, "r") as fp:
strokes = []
viewboxes = []
with open(self.eventfile, "r") as fp:
events = json.loads("[" + fp.read() + "]")
for i, event in enumerate(events):
if i == 0:
# metadata on first line, add as initial viewbox to slice
viewboxes.append(TimedViewbox(-float('Infinity'), 0, 0, event[1], event[2]))
else:
if type(event) is list:
# ignore double metadatas, which appear when continuaing an existing drawing
continue
if event["event"] == "viewbox":
viewboxes.extend([TimedViewbox(
b['t'], b['x'], b['y'], b['width'], b['height']) for b in event['viewboxes']])
if event["event"] == "stroke":
# points = []
# for i in range(int(len(stroke) / 4)):
# p = stroke[i*4:i*4+4]
# points.append([float(p[0]), float(p[1]), int(p[2]), float(p[3])])
strokes.append(
Stroke(
event["color"],
[Point.fromTuple(tuple(p))
for p in event["points"]],
)
)
return AnimationSlice([self.id, None, None], strokes, viewboxes, audioslice=self.get_audio())
def get_metadata(self):
canvas = self.get_canvas_metadata()
if os.path.exists(self.metadata_fn):
with open(self.metadata_fn, "r") as fp:
metadata = json.load(fp)
else:
metadata = {}
metadata["canvas"] = canvas
return metadata
def get_absolute_viewbox(self) -> Viewbox:
return self.get_animation().get_bounding_box()
class Viewbox:
def __init__(self, x: float, y: float, width: float, height: float):
self.x = x
self.y = y
self.width = width
self.height = height
def __str__(self) -> str:
return f"{self.x} {self.y} {self.width} {self.height}"
class TimedViewbox(Viewbox):
def __init__(self, time: Milliseconds, x: float, y: float, width: float, height: float):
super().__init__(x, y, width, height)
self.t = time
FrameIndex = tuple[int, int]
class AnimationSlice:
# either a whole drawing or the result of applying an annotation to a drawing (an excerpt)
# TODO rename to AnimationSlice to include audio as well
def __init__(
self, slice_id: SliceId, strokes: list[Stroke], viewboxes: list[TimedViewbox] = [], t_in: float = 0, t_out: float = None, audioslice: AudioSlice = None
) -> None:
self.id = slice_id
self.strokes = strokes
self.viewboxes = viewboxes
self.t_in = t_in
self.t_out = t_out
self.audio = audioslice
# TODO: Audio
def asDict(self) -> dict:
"""Can be used to json-ify the animation-slice
"""
# conversion necessary for when no t_in is given
boxes = [v.__dict__ for v in self.viewboxes]
for box in boxes:
if box['t'] == -float('Infinity'):
box['t'] = 0
drawing = {
"file": self.getUrl(),
"time": "-", # creation date
# dimensions of drawing canvas
"dimensions": [self.viewboxes[0].width, self.viewboxes[0].height],
"shape": [s.asDict() for s in self.strokes],
"viewboxes": boxes,
"bounding_box": self.get_bounding_box().__dict__,
"audio": self.getAudioDict() if self.audio else None
}
return drawing
def getAudioDict(self):
"""quick and dirty to not use audio.asDict(), but it avoids passing all around sorts of data"""
return {
"file": '/files/' + self.getUrl('.mp3'),
"offset": 0
# "offset": self.audio.offset / 1000
}
def getUrl(self, extension = '') -> str:
if not self.id[1] and not self.id[2]:
return self.id[0]
return self.id[0] + f"{extension}?t_in={self.t_in}&t_out={self.t_out}"
def get_bounding_box(self, stroke_thickness: float = 3.5) -> Viewbox:
"""Stroke_thickness 3.5 == 1mm. If it should not be considered, just set it to 0.
"""
if len(self.strokes) == 0:
# empty set
return Viewbox(0,0,0,0)
min_x, max_x = float("inf"), float("-inf")
min_y, max_y = float("inf"), float("-inf")
for s in self.strokes:
for p in s.points:
x1 = p.x - stroke_thickness/2
x2 = p.x + stroke_thickness/2
y1 = p.y - stroke_thickness/2
y2 = p.y + stroke_thickness/2
if x1 < min_x:
min_x = x1
if x2 > max_x:
max_x = x2
if y1 < min_y:
min_y = y1
if y2 > max_y:
max_y = y2
return Viewbox(min_x, min_y, max_x - min_x, max_y - min_y)
def getSlice(self, t_in: Milliseconds, t_out: Milliseconds) -> AnimationSlice:
"""slice the slice. T in ms"""
frame_in = self.getIndexForInPoint(t_in)
frame_out = self.getIndexForOutPoint(t_out)
strokes = self.getStrokeSlices(frame_in, frame_out, t_in)
# TODO shift t of points with t_in
viewboxes = self.getViewboxesSlice(t_in, t_out)
print(viewboxes[0])
audio = self.audio.getSlice(t_in, t_out) if self.audio else None
return AnimationSlice([self.id[0], t_in, t_out], strokes, viewboxes, t_in, t_out, audio)
def get_as_svg_dwg(self) -> svgwrite.Drawing:
box = self.get_bounding_box()
(_, fn) = tempfile.mkstemp(suffix='.svg', text=True)
dwg = svgwrite.Drawing(fn, size=(box.width, box.height))
dwg.viewbox(box.x, box.y, box.width, box.height)
self.add_to_dwg(dwg)
dwg.defs.add(
dwg.style("path{stroke-width:1mm;stroke-linecap: round;}"))
return dwg
def get_as_svg(self) -> str:
dwg = self.get_as_svg_dwg()
fp = io.StringIO()
dwg.write(fp, pretty=True)
return fp.getvalue()
def add_to_dwg(self, dwg: SvgDrawing):
group = svgwrite.container.Group()
for stroke in self.strokes:
stroke.add_to_dwg(group)
dwg.add(group)
def getViewboxesSlice(self, t_in: Milliseconds, t_out: Milliseconds) -> list[TimedViewbox]:
"""Extract the viewboxes for in- and outpoints.
If there's one before inpoint, move that to the t_in, so that animation starts at the right position
the slice is offset by t_in ms
"""
viewboxes = [] # Add single empty element, so that we can use viewboxes[0] later
lastbox = None
for viewbox in self.viewboxes:
if viewbox.t > t_out:
break
if viewbox.t <= t_in:
# make sure the first box is the last box from _before_ the slice
firstbox = TimedViewbox(
0, viewbox.x, viewbox.y, viewbox.width, viewbox.height)
if not len(viewboxes):
viewboxes.append(firstbox)
else:
viewboxes[0] = firstbox
continue
viewboxes.append(TimedViewbox(viewbox.t-t_in, viewbox.x, viewbox.y, viewbox.width, viewbox.height))
return viewboxes
def getStrokeSlices(
self, index_in: FrameIndex, index_out: FrameIndex, t_offset: Seconds = 0
) -> list[Stroke]:
"""Get list of Stroke/StrokeSlice based in in and out indexes
Based on annotation.js getStrokesSliceForPathRange(in_point, out_point)
"""
slices = []
for i in range(index_in[0], index_out[0] + 1):
try:
stroke = self.strokes[i]
except IndexError:
# out point can be Infinity. So interrupt whenever the end is reached
break
in_i = index_in[1] if index_in[0] == i else 0
out_i = index_out[1] if index_out[0] == i else len(
stroke.points) - 1
slices.append(StrokeSlice(stroke, in_i, out_i, t_offset))
return slices
def getIndexForInPoint(self, ms: Milliseconds) -> FrameIndex:
"""Get the frame index (path, point) based on the given time
The In point version (so the first index after ms)
Equal to annotations.js findPositionForTime(ms)
"""
path_i = 0
point_i = 0
for i, stroke in enumerate(self.strokes):
start_at = stroke.points[0].t
end_at = stroke.points[-1].t
if end_at < ms:
# certainly not the right point yet
continue
if start_at > ms:
path_i = i
point_i = 0
break # too far, so this is the first point after in point
else:
# our in-point is inbetween first and last of the stroke
# we are getting close, find the right point_i
path_i = i
for pi, point in enumerate(stroke.points):
point_i = pi
if point.t > ms:
break # stop when finding the next point after in point
break # done :-)
return (path_i, point_i)
def getIndexForOutPoint(self, ms: Milliseconds) -> FrameIndex:
"""Get the frame index (path, point) based on the given time
The Out point version (so the last index before ms)
Equal to annotations.js findPositionForTime(ms)
"""
return self.getIndexForTime(ms)
def getIndexForTime(self, ms: Milliseconds) -> FrameIndex:
"""Get the frame index (path, point) based on the given time
Equal to annotations.js findPositionForTime(ms)
"""
path_i = 0
point_i = 0
for i, stroke in enumerate(self.strokes):
start_at = stroke.points[0].t
end_at = stroke.points[-1].t
if start_at > ms:
break # too far
if end_at > ms:
# we are getting close, find the right point_i
path_i = i
for pi, point in enumerate(stroke.points):
if point.t > ms:
break # too far
point_i = pi
break # done :-)
else:
# in case this is our last path, stroe this as
# best option thus far
path_i = i
point_i = len(stroke.points) - 1
return (path_i, point_i)
audiocache = {}
class AudioSlice:
def __init__(self, filename: Filename, drawing: Drawing, t_in: Milliseconds = None, t_out: Milliseconds = None, offset: Milliseconds = None):
self.filename = filename
self.drawing = drawing
self.t_in = t_in # in ms
self.t_out = t_out # in ms
self.offset = offset # in ms TODO: use from self.drawing metadata
def getSlice(self, t_in: float, t_out: float) -> AnimationSlice:
return AudioSlice(self.filename, self.drawing, t_in, t_out, self.offset)
def asDict(self):
return {
"file": self.getUrl(),
# "offset": self.offset/1000
}
def getUrl(self):
fn = self.filename.replace("../files/audio", "/file/")
params = []
if self.t_in:
params.append(f"t_in={self.t_in}")
if self.t_out:
params.append(f"t_out={self.t_in}")
if len(params):
fn += "?" + "&".join(params)
return fn
async def export(self, format="mp3"):
"""Returns file descriptor of tempfile"""
# Opening file and extracting segment
start = int(self.t_in - self.offset) # millisecond precision is enough
end = int(self.t_out - self.offset) # millisecond precision is enough
# call ffmpeg directly, with given in and outpoint, so no unnecessary data is loaded, and no double conversion (e.g. ogg -> wav -> ogg ) is performed
out_f = io.BytesIO()
# build converter command to export
conversion_command = [
"ffmpeg",
'-ss', f"{start}ms",
'-to', f"{end}ms",
"-i", self.filename, # ss before input, so not whole file is loaded
]
conversion_command.extend([
"-f", format, '-', # to stdout
])
# read stdin / write stdout
logger.info("ffmpeg start")
proc = await asyncio.create_subprocess_exec(
*conversion_command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.DEVNULL)
p_out, p_err = await proc.communicate()
logger.info("ffmpeg finished")
if proc.returncode != 0:
raise Exception(
"Encoding failed. ffmpeg/avlib returned error code: {0}\n\nCommand:{1}".format(
p.returncode, conversion_command))
out_f.write(p_out)
out_f.seek(0)
return out_f
# old way, use AudioSegment, easy but slow (reads whole ogg to wav, then export segment to ogg again)
# logger.info("loading audio")
# if self.filename in audiocache:
# song = audiocache[self.filename]
# else:
# song = AudioSegment.from_file(self.filename)
# audiocache[self.filename] = song
# logger.info("loaded audio")
# if start < 0 and end < 0:
# extract = AudioSegment.silent(
# duration=end-start, frame_rate=song.frame_rate)
# else:
# if start < 0:
# preroll = AudioSegment.silent(
# duration=start * -1, frame_rate=song.frame_rate)
# start = 0
# else:
# preroll = None
# if end > len(song):
# postroll = AudioSegment.silent(
# duration=end - len(song), frame_rate=song.frame_rate)
# end = len(song) - 1
# else:
# postroll = None
# extract = song[start: end]
# if preroll:
# extract = preroll + extract
# if postroll:
# extract += postroll
# # Saving
# return extract.export(None, format=format)
class AnnotationIndex:
def __init__(
self, filename: Filename, drawing_dir: Filename, metadata_dir: Filename
) -> None:
self.filename = filename
self.drawing_dir = drawing_dir
self.metadata_dir = metadata_dir
# disable disk cache because of glitches shelve.open(filename, writeback=True)
self.shelve = {}
def refresh(self):
logger.info("refreshing")
# reset the index
for key in list(self.shelve.keys()):
print(key)
del self.shelve[key]
self.shelve["_drawings"] = {
d.id: d
for d in [
Drawing(fn, self.metadata_dir, self.drawing_dir) for fn in self.get_drawing_filenames()
]
}
self.shelve['_tags'] = {}
self.shelve['_annotations'] = {}
drawing: Drawing
for drawing in self.shelve['_drawings'].values():
meta = drawing.get_metadata()
if 'annotations' not in meta:
continue
for ann in meta['annotations']:
annotation = Annotation(
ann['tag'], drawing, ann['t_in'], ann['t_out'])
self.shelve['_annotations'][annotation.id] = annotation
if annotation.tag not in self.shelve['_tags']:
self.shelve['_tags'][annotation.tag] = [annotation]
else:
self.shelve['_tags'][annotation.tag].append(
annotation
)
@property
def drawings(self) -> dict[str, Drawing]:
return self.shelve["_drawings"]
@property
def tags(self) -> dict[str, list[Annotation]]:
return self.shelve["_tags"]
@property
def annotations(self) -> dict[str, Annotation]:
return self.shelve["_annotations"]
def get_annotations(self, tag) -> list[Annotation]:
if tag not in self.tags:
return []
return self.tags[tag]
def get_drawing_names(self) -> list[str]:
return [
name[:-16]
for name in os.listdir(self.drawing_dir)
if name.endswith("json_appendable") and os.stat(os.path.join(self.drawing_dir, name)).st_size > 0
]
def get_drawing_filenames(self) -> list[Filename]:
return [
os.path.join(self.drawing_dir, f"{name}.json_appendable")
for name in self.get_drawing_names()
]
def __del__(self):
self.shelve.close()
# Point = tuple[float, float, float]
class Point:
def __init__(self, x: float, y: float, last: bool, t: Seconds):
self.x = float(x)
self.y = float(y) # if y == 0 it can still be integer.... odd python
self.last = last
self.t = t
@classmethod
def fromTuple(cls, p: tuple[float, float, int, float]):
return cls(p[0], p[1], bool(p[2]), p[3])
def scaledToFit(self, dimensions: dict[str, float]) -> Point:
# TODO: change so that it actually scales to FIT dimensions
return Point(self.x, self.y, self.last, self.t)
def asList(self) -> list:
return [self.x, self.y, 1 if self.last else 0, self.t]
Points = list[Point]
SvgDrawing = Union[svgwrite.container.SVG, svgwrite.container.Group]
class Stroke:
def __init__(self, color: str, points: Points) -> None:
self.color = color
self.points = points
def asDict(self) -> dict:
return {"color": self.color, "points": [p.asList() for p in self.points]}
def add_to_dwg(self, dwg: SvgDrawing):
path = svgwrite.path.Path(d=self.get_as_d()).stroke(
self.color, 1).fill("none")
dwg.add(path)
# def get_bounding_box(self) -> Viewbox:
# min_x, max_x = float("inf"), float("-inf")
# min_y, max_y = float("inf"), float("-inf")
# for p in self.points:
# if p.x < min_x:
# min_x = p.x
# if p.x > max_x:
# max_x = p.x
# if p.y < min_y:
# min_y = p.y
# if p.y > max_y:
# max_y = p.y
# return Viewbox(min_x, min_y, max_x - min_x, max_y - min_y)
def get_as_d(self):
d = ""
prev_point = None
cmd = ""
for point in self.points:
if not prev_point:
# TODO multiply points by scalars for dimensions (height widht of drawing)
d += f'M{point.x:.6},{point.y:.6} '
cmd = 'M'
else:
if prev_point.last:
d += " m"
cmd = "m"
elif cmd != 'l':
d += ' l '
cmd = 'l'
diff_point = {
"x": point.x - prev_point.x,
"y": point.y - prev_point.y,
}
# TODO multiply points by scalars for dimensions (height widht of drawing)
d += f'{diff_point["x"]:.6},{diff_point["y"]:.6} '
prev_point = point
return d
class StrokeSlice(Stroke):
def __init__(self, stroke: Stroke, i_in: int = None, i_out: int = None, t_offset: Seconds = 0) -> None:
self.stroke = stroke
self.i_in = 0 if i_in is None else i_in
self.i_out = len(self.stroke.points) - 1 if i_out is None else i_out
# deepcopy points, because slices can be offset in time
self.points = copy.deepcopy(self.stroke.points[self.i_in: self.i_out + 1])
for p in self.points:
p.t -= t_offset
def slice_id(self):
return f"{self.i_in}-{self.i_out}"
# @property
# def points(self) -> Points:
# return self.stroke.points[self.i_in: self.i_out + 1]
@property
def color(self) -> str:
return self.stroke.color
def strokes2D(strokes):
# strokes to a d attribute for a path
d = ""
last_stroke = None
cmd = ""
for stroke in strokes:
if not last_stroke:
d += f"M{stroke[0]},{stroke[1]} "
cmd = 'M'
else:
if last_stroke[2] == 1:
d += " m"
cmd = 'm'
elif cmd != 'l':
d += ' l '
cmd = 'l'
rel_stroke = [stroke[0] - last_stroke[0],
stroke[1] - last_stroke[1]]
d += f"{rel_stroke[0]},{rel_stroke[1]} "
last_stroke = stroke
return d