diff --git a/pyproject.toml b/pyproject.toml index d364da2..646625a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -8,6 +8,8 @@ authors = ["Ruben van de Ven "] python = "^3.9" tornado = "^6.1" coloredlogs = "^15.0.1" +pydub = "^0.25.1" +svgwrite = "^1.4.1" [tool.poetry.dev-dependencies] diff --git a/svganim/__init__.py b/svganim/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/svganim/strokes.py b/svganim/strokes.py new file mode 100644 index 0000000..03582c2 --- /dev/null +++ b/svganim/strokes.py @@ -0,0 +1,364 @@ +from __future__ import annotations +import json +from os import X_OK, PathLike +import os +from webserver import strokes2D +from typing import Optional, Union +import shelve +from pydub import AudioSegment +import svgwrite + + +class Annotation: + def __init__(self, tag: str, drawing: Drawing, t_in: float, t_out: float) -> None: + self.tag = tag + self.t_in = t_in + self.t_out = t_out + self.drawing = drawing + + def getAnimationSlice(self) -> AnimationSlice: + return self.drawing.get_animation().getSlice(self.t_in, self.t_out) + + def get_as_svg(self): + anim = self.getAnimationSlice() + bb = anim.get_bounding_box() + dwg = svgwrite.Drawing('/tmp/test.svg', size=(bb.width, bb.height)) + anim.add_to_dwg(dwg) + dwg.save() + # TODO .... wip + print('saved!') + + + +Filename = Union[str, bytes, PathLike[str], PathLike[bytes]] + + +class Drawing: + def __init__(self, filename: Filename, metadata_dir: Filename) -> None: + self.eventfile = filename + self.id = os.path.splitext(os.path.basename(self.eventfile))[0] + self.metadata_fn = os.path.join(metadata_dir, f"{self.id}.json") + + def get_url(self) -> str: + return f"/files/{self.id}" + + def get_annotations_url(self) -> str: + return f"/annotations/{self.id}" + + def get_canvas_metadata(self) -> list: + with open(self.eventfile, "r") as fp: + first_line = fp.readline().strip() + + if first_line.endswith(","): + first_line = first_line[:-1] + data = json.loads(first_line) + return { + "date": data[0], + "dimensions": { + "width": data[1], + "height": data[2], + }, + } + + def get_animation(self) -> AnimationSlice: + # with open(self.eventfile, "r") as fp: + strokes = [] + with open(self.eventfile, "r") as fp: + events = json.loads("[" + fp.read() + "]") + for i, event in enumerate(events): + if i == 0: + # metadata on first line + pass + else: + if event["event"] == "viewbox": + pass + if event["event"] == "stroke": + # points = [] + # for i in range(int(len(stroke) / 4)): + # p = stroke[i*4:i*4+4] + # points.append([float(p[0]), float(p[1]), int(p[2]), float(p[3])]) + strokes.append( + Stroke( + event["color"], + [Point.fromTuple(tuple(p)) for p in event["points"]], + ) + ) + return AnimationSlice(strokes) + + def get_metadata(self): + canvas = self.get_canvas_metadata() + if os.path.exists(self.metadata_fn): + with open(self.metadata_fn, "r") as fp: + metadata = json.load(fp) + else: + metadata = {} + metadata["canvas"] = canvas + return metadata + + def get_absolute_viewbox(self) -> Viewbox: + return self.get_animation().get_bounding_box() + + +class Viewbox: + def __init__(self, x: float, y: float, width: float, height: float): + self.x = x + self.y = y + self.width = width + self.height = height + + def __str__(self) -> str: + return f"{self.x} {self.y} {self.width} {self.height}" + + +FrameIndex = tuple[int, int] + + +class AnimationSlice: + # either a whole drawing or the result of applying an annotation to a drawing (an excerpt) + # TODO rename to AnimationSlice to include audio as well + def __init__( + self, strokes: list[Stroke], t_in: float = 0, t_out: float = None + ) -> None: + self.strokes = strokes + self.t_in = t_in + self.t_out = t_out + # TODO: Audio + + def get_bounding_box(self) -> Viewbox: + min_x, max_x = float("inf"), float("-inf") + min_y, max_y = float("inf"), float("-inf") + + for s in self.strokes: + for p in s.points: + if p.x < min_x: + min_x = p.x + if p.x > max_x: + max_x = p.x + if p.y < min_y: + min_y = p.y + if p.y > max_y: + max_y = p.y + return Viewbox(min_x, min_y, max_x - min_x, max_y - min_y) + + def getSlice(self, t_in: float, t_out: float) -> AnimationSlice: + frame_in = self.getIndexForTime(t_in) + frame_out = self.getIndexForTime(t_out) + strokes = self.getStrokeSlices(frame_in, frame_out) + return AnimationSlice(strokes, t_in, t_out) + + def add_to_dwg(self, dwg: SvgDrawing): + group = svgwrite.container.Group() + for stroke in self.strokes: + stroke.add_to_dwg(group) + + def getStrokeSlices( + self, index_in: FrameIndex, index_out: FrameIndex + ) -> list[Stroke]: + """Get list of Stroke/StrokeSlice based in in and out indexes + Based on annotation.js getStrokesSliceForPathRange(in_point, out_point) + """ + slices = [] + for i in range(index_in[0], index_out[0] + 1): + try: + stroke = self.strokes[i] + except IndexError: + # out point can be Infinity. So interrupt whenever the end is reached + break + + in_i = index_in[1] if index_in[0] == i else 0 + out_i = index_out[1] if index_out[0] == i else len(stroke.points) - 1 + + slices.append(StrokeSlice(stroke, in_i, out_i)) + return slices + + def getIndexForTime(self, ms) -> FrameIndex: + """Get the frame index (path, point) based on the given time + Equal to annotations.js findPositionForTime(ms) + """ + path_i = 0 + point_i = 0 + for i, stroke in enumerate(self.strokes): + start_at = stroke.points[0].t + end_at = stroke.points[-1].t + + if start_at > ms: + break # too far + if end_at > ms: + # we are getting close, find the right point_i + path_i = i + for pi, point in enumerate(stroke.points): + if point.t: + break # too far + point_i = pi + break # done :-) + else: + # in case this is our last path, stroe this as + # best option thus far + path_i = i + point_i = len(stroke.points) - 1 + return (path_i, point_i) + + +class AudioSlice: + def __init__(self, filename: Filename, t_in: float, t_out: float): + self.filename = filename + self.t_in = t_in # in ms + self.t_out = t_out # in ms + + def export(self, format="mp3"): + """Returns file descriptor of tempfile""" + # Opening file and extracting segment + song = AudioSegment.from_file(self.filename) + extract = song[self.t_in : self.t_out] + + # Saving + return extract.export(None, format=format) + + +class AnnotationIndex: + def __init__( + self, filename: Filename, drawing_dir: Filename, metadata_dir: Filename + ) -> None: + self.filename = filename + self.drawing_dir = drawing_dir + self.metadata_dir = metadata_dir + + self.index = shelve.open(filename) + + def initiate(self): + # reset the index + for key in self.index: + del self.index[key] + + self.index["_drawings"] = { + d.id: d + for d in [ + Drawing(fn, self.metadata_dir) for fn in self.get_drawing_filenames() + ] + } + + drawing: Drawing + for drawing in self.index['_drawings'].values(): + meta = drawing.get_metadata() + if 'annotations' not in meta: + continue + for ann in meta['annotations']: + annotation = Annotation(ann['tag'], drawing, ann['t_in'], ann['t_out']) + if annotation.tag not in self.index: + self.index[annotation.tag] = [annotation] + else: + self.index[annotation.tag].append( + annotation + ) + + @property + def drawings(self) -> dict[str, Drawing]: + return self.index["_drawings"] + + def get(self, tag) -> list[Annotation]: + if tag not in self.index: + return [] + return self.index[tag] + + def get_drawing_names(self) -> list[str]: + return [ + name[:-16] + for name in os.listdir(self.drawing_dir) + if name.endswith("json_appendable") + ] + + def get_drawing_filenames(self) -> list[Filename]: + return [ + os.path.join(self.drawing_dir, f"{name}.json_appendable") + for name in self.get_drawing_names() + ] + + def __del__(self): + self.index.close() + + +# Point = tuple[float, float, float] + + +class Point: + def __init__(self, x: float, y: float, last: bool, t: float): + self.x = x + self.y = y + self.last = last + self.t = t + + @classmethod + def fromTuple(cls, p: tuple[float, float, int, float]): + return cls(p[0], p[1], bool(p[2]), p[3]) + + +Points = list[Point] +SvgDrawing = Union[svgwrite.container.SVG, svgwrite.container.Group] + +class Stroke: + def __init__(self, color: str, points: Points) -> None: + self.color = color + self.points = points + + def add_to_dwg(self, dwg: SvgDrawing): + path = svgwrite.path.Path(self.get_as_d).stroke(self.color,1).fill("none") + dwg.add(path) + + + def get_bounding_box(self) -> Viewbox: + min_x, max_x = float("inf"), float("-inf") + min_y, max_y = float("inf"), float("-inf") + + for p in self.points: + if p.x < min_x: + min_x = p.x + if p.x > max_x: + max_x = p.x + if p.y < min_y: + min_y = p.y + if p.y > max_y: + max_y = p.y + return Viewbox(min_x, min_y, max_x - min_x, max_y - min_y) + + def get_as_d(self): + d = "" + prev_point = None + cmd = "" + for point in self.points: + if not prev_point: + # TODO multiply points by scalars for dimensions (height widht of drawing) + d += f'M{point.x:.6},{point.y:.6} ' + cmd = 'M' + else: + if prev_point.last: + d += " m" + cmd = "m" + elif cmd != 'l': + d += ' l ' + cmd = 'l' + diff_point = { + "x": point.x - prev_point.x, + "y": point.y - prev_point.y, + } + # TODO multiply points by scalars for dimensions (height widht of drawing) + d += f'{diff_point.x:.6},{diff_point.y:.6} ' + prev_point = point + return d + + +class StrokeSlice(Stroke): + def __init__(self, stroke: Stroke, i_in: int = None, i_out: int = None) -> None: + self.stroke = stroke + self.i_in = 0 if i_in is None else i_in + self.i_out = len(self.stroke.points) - 1 if i_out is None else i_out + + def slice_id(self): + return f"{self.i_in}-{self.i_out}" + + @property + def points(self) -> Points: + return self.stroke.points[self.i_in : self.i_out + 1] + + @property + def color(self) -> str: + return self.stroke.color