from __future__ import annotations import json from os import X_OK, PathLike import os from typing import Optional, Union import shelve from pydub import AudioSegment import svgwrite import tempfile import io class Annotation: def __init__(self, tag: str, drawing: Drawing, t_in: float, t_out: float) -> None: self.tag = tag self.t_in = t_in self.t_out = t_out self.drawing = drawing @property def id(self) -> str: return f'{self.drawing.id}:{self.tag}:{self.t_in}:{self.t_out}' def getAnimationSlice(self) -> AnimationSlice: return self.drawing.get_animation().getSlice(self.t_in, self.t_out) def get_as_svg(self) -> str: return self.getAnimationSlice().get_as_svg() Filename = Union[str, bytes, PathLike[str], PathLike[bytes]] class Drawing: def __init__(self, filename: Filename, metadata_dir: Filename, basedir: Filename) -> None: self.eventfile = filename self.id = os.path.splitext(os.path.basename(self.eventfile))[0] self.metadata_fn = os.path.join(metadata_dir, f"{self.id}.json") self.basedir = basedir def get_url(self) -> str: return f"/files/{self.id}" def get_annotations_url(self) -> str: return f"/annotations/{self.id}" def get_canvas_metadata(self) -> list: with open(self.eventfile, "r") as fp: first_line = fp.readline().strip() if first_line.endswith(","): first_line = first_line[:-1] data = json.loads(first_line) return { "date": data[0], "dimensions": { "width": data[1], "height": data[2], }, } def get_audio(self) -> Optional[AudioSlice]: md = self.get_metadata() if 'audio' not in md: return None if 'file' not in md['audio']: return None return AudioSlice(filename=os.path.join(self.basedir,md['audio']['file'][1:]), offset=md['audio']['offset']*1000) def get_animation(self) -> AnimationSlice: # with open(self.eventfile, "r") as fp: strokes = [] with open(self.eventfile, "r") as fp: events = json.loads("[" + fp.read() + "]") for i, event in enumerate(events): if i == 0: # metadata on first line pass else: if event["event"] == "viewbox": pass if event["event"] == "stroke": # points = [] # for i in range(int(len(stroke) / 4)): # p = stroke[i*4:i*4+4] # points.append([float(p[0]), float(p[1]), int(p[2]), float(p[3])]) strokes.append( Stroke( event["color"], [Point.fromTuple(tuple(p)).scaled(self.get_canvas_metadata()['dimensions']) for p in event["points"]], ) ) return AnimationSlice(strokes, audioslice=self.get_audio() ) def get_metadata(self): canvas = self.get_canvas_metadata() if os.path.exists(self.metadata_fn): with open(self.metadata_fn, "r") as fp: metadata = json.load(fp) else: metadata = {} metadata["canvas"] = canvas return metadata def get_absolute_viewbox(self) -> Viewbox: return self.get_animation().get_bounding_box() class Viewbox: def __init__(self, x: float, y: float, width: float, height: float): self.x = x self.y = y self.width = width self.height = height def __str__(self) -> str: return f"{self.x} {self.y} {self.width} {self.height}" FrameIndex = tuple[int, int] class AnimationSlice: # either a whole drawing or the result of applying an annotation to a drawing (an excerpt) # TODO rename to AnimationSlice to include audio as well def __init__( self, strokes: list[Stroke], t_in: float = 0, t_out: float = None, audioslice: AudioSlice = None ) -> None: self.strokes = strokes self.t_in = t_in self.t_out = t_out self.audio = audioslice # TODO: Audio def get_bounding_box(self) -> Viewbox: min_x, max_x = float("inf"), float("-inf") min_y, max_y = float("inf"), float("-inf") for s in self.strokes: for p in s.points: if p.x < min_x: min_x = p.x if p.x > max_x: max_x = p.x if p.y < min_y: min_y = p.y if p.y > max_y: max_y = p.y return Viewbox(min_x, min_y, max_x - min_x, max_y - min_y) def getSlice(self, t_in: float, t_out: float) -> AnimationSlice: frame_in = self.getIndexForTime(t_in) frame_out = self.getIndexForTime(t_out) strokes = self.getStrokeSlices(frame_in, frame_out) audio = self.audio.getSlice(t_in, t_out) if self.audio else None return AnimationSlice(strokes, t_in, t_out, audio) def get_as_svg_dwg(self) -> svgwrite.Drawing: box = self.get_bounding_box() (_, fn) = tempfile.mkstemp(suffix='.svg', text=True) dwg = svgwrite.Drawing(fn, size=(box.width, box.height)) dwg.viewbox(box.x, box.y, box.width, box.height) self.add_to_dwg(dwg) return dwg def get_as_svg(self) -> str: dwg = self.get_as_svg_dwg() fp = io.StringIO() dwg.write(fp, pretty=True) return fp.getvalue() def add_to_dwg(self, dwg: SvgDrawing): group = svgwrite.container.Group() for stroke in self.strokes: stroke.add_to_dwg(group) dwg.add(group) def getStrokeSlices( self, index_in: FrameIndex, index_out: FrameIndex ) -> list[Stroke]: """Get list of Stroke/StrokeSlice based in in and out indexes Based on annotation.js getStrokesSliceForPathRange(in_point, out_point) """ slices = [] for i in range(index_in[0], index_out[0] + 1): try: stroke = self.strokes[i] except IndexError: # out point can be Infinity. So interrupt whenever the end is reached break in_i = index_in[1] if index_in[0] == i else 0 out_i = index_out[1] if index_out[0] == i else len(stroke.points) - 1 slices.append(StrokeSlice(stroke, in_i, out_i)) return slices def getIndexForTime(self, ms) -> FrameIndex: """Get the frame index (path, point) based on the given time Equal to annotations.js findPositionForTime(ms) """ path_i = 0 point_i = 0 for i, stroke in enumerate(self.strokes): start_at = stroke.points[0].t end_at = stroke.points[-1].t if start_at > ms: break # too far if end_at > ms: # we are getting close, find the right point_i path_i = i for pi, point in enumerate(stroke.points): if point.t: break # too far point_i = pi break # done :-) else: # in case this is our last path, stroe this as # best option thus far path_i = i point_i = len(stroke.points) - 1 return (path_i, point_i) class AudioSlice: def __init__(self, filename: Filename, t_in: float=None, t_out: float=None, offset:float = None): self.filename = filename self.t_in = t_in # in ms self.t_out = t_out # in ms self.offset = offset # in ms def getSlice(self, t_in: float, t_out: float) -> AnimationSlice: return AudioSlice(self.filename, t_in, t_out, self.offset) def export(self, format="mp3"): """Returns file descriptor of tempfile""" # Opening file and extracting segment song = AudioSegment.from_file(self.filename) start = self.t_in - self.offset end = self.t_out - self.offset if start < 0 and end < 0: extract = AudioSegment.silent(duration=end-start, frame_rate=song.frame_rate) else: if start < 0: preroll = AudioSegment.silent(duration=start * -1, frame_rate=song.frame_rate) start = 0 else: preroll = None if end > len(song): postroll = AudioSegment.silent(duration=end - len(song), frame_rate=song.frame_rate) end = len(song) - 1 else: postroll = None extract = song[start: end] if preroll: extract = preroll + extract if postroll: extract += postroll # Saving return extract.export(None, format=format) class AnnotationIndex: def __init__( self, filename: Filename, drawing_dir: Filename, metadata_dir: Filename ) -> None: self.filename = filename self.drawing_dir = drawing_dir self.metadata_dir = metadata_dir self.shelve = shelve.open(filename, writeback=True) def refresh(self): # reset the index for key in self.shelve: del self.shelve[key] self.shelve["_drawings"] = { d.id: d for d in [ Drawing(fn, self.metadata_dir, self.drawing_dir) for fn in self.get_drawing_filenames() ] } self.shelve['_tags'] = {} self.shelve['_annotations'] = {} drawing: Drawing for drawing in self.shelve['_drawings'].values(): meta = drawing.get_metadata() if 'annotations' not in meta: continue for ann in meta['annotations']: annotation = Annotation(ann['tag'], drawing, ann['t_in'], ann['t_out']) self.shelve['_annotations'][annotation.id] = annotation if annotation.tag not in self.shelve['_tags']: self.shelve['_tags'][annotation.tag] = [annotation] else: self.shelve['_tags'][annotation.tag].append( annotation ) @property def drawings(self) -> dict[str, Drawing]: return self.shelve["_drawings"] @property def tags(self) -> dict[str, list[Annotation]]: return self.shelve["_tags"] @property def annotations(self) -> dict[str, Annotation]: return self.shelve["_annotations"] def get_annotations(self, tag) -> list[Annotation]: if tag not in self.tags: return [] return self.tags[tag] def get_drawing_names(self) -> list[str]: return [ name[:-16] for name in os.listdir(self.drawing_dir) if name.endswith("json_appendable") ] def get_drawing_filenames(self) -> list[Filename]: return [ os.path.join(self.drawing_dir, f"{name}.json_appendable") for name in self.get_drawing_names() ] def __del__(self): self.shelve.close() # Point = tuple[float, float, float] class Point: def __init__(self, x: float, y: float, last: bool, t: float): self.x = x self.y = y self.last = last self.t = t @classmethod def fromTuple(cls, p: tuple[float, float, int, float]): return cls(p[0], p[1], bool(p[2]), p[3]) def scaled(self, dimensions: dict[str, float]) -> Point: return Point(self.x*dimensions['width'], self.y * dimensions['height'], self.last, self.t) Points = list[Point] SvgDrawing = Union[svgwrite.container.SVG, svgwrite.container.Group] class Stroke: def __init__(self, color: str, points: Points) -> None: self.color = color self.points = points def add_to_dwg(self, dwg: SvgDrawing): path = svgwrite.path.Path(d=self.get_as_d()).stroke(self.color,1).fill("none") dwg.add(path) def get_bounding_box(self) -> Viewbox: min_x, max_x = float("inf"), float("-inf") min_y, max_y = float("inf"), float("-inf") for p in self.points: if p.x < min_x: min_x = p.x if p.x > max_x: max_x = p.x if p.y < min_y: min_y = p.y if p.y > max_y: max_y = p.y return Viewbox(min_x, min_y, max_x - min_x, max_y - min_y) def get_as_d(self): d = "" prev_point = None cmd = "" for point in self.points: if not prev_point: # TODO multiply points by scalars for dimensions (height widht of drawing) d += f'M{point.x:.6},{point.y:.6} ' cmd = 'M' else: if prev_point.last: d += " m" cmd = "m" elif cmd != 'l': d += ' l ' cmd = 'l' diff_point = { "x": point.x - prev_point.x, "y": point.y - prev_point.y, } # TODO multiply points by scalars for dimensions (height widht of drawing) d += f'{diff_point["x"]:.6},{diff_point["y"]:.6} ' prev_point = point return d class StrokeSlice(Stroke): def __init__(self, stroke: Stroke, i_in: int = None, i_out: int = None) -> None: self.stroke = stroke self.i_in = 0 if i_in is None else i_in self.i_out = len(self.stroke.points) - 1 if i_out is None else i_out def slice_id(self): return f"{self.i_in}-{self.i_out}" @property def points(self) -> Points: return self.stroke.points[self.i_in : self.i_out + 1] @property def color(self) -> str: return self.stroke.color def strokes2D(strokes): # strokes to a d attribute for a path d = ""; last_stroke = None; cmd = ""; for stroke in strokes: if not last_stroke: d += f"M{stroke[0]},{stroke[1]} " cmd = 'M' else: if last_stroke[2] == 1: d += " m" cmd = 'm' elif cmd != 'l': d+=' l ' cmd = 'l' rel_stroke = [stroke[0] - last_stroke[0], stroke[1] - last_stroke[1]]; d += f"{rel_stroke[0]},{rel_stroke[1]} " last_stroke = stroke return d