from __future__ import annotations import json from os import X_OK, PathLike import os from webserver import strokes2D from typing import Optional, Union import shelve from pydub import AudioSegment import svgwrite class Annotation: def __init__(self, tag: str, drawing: Drawing, t_in: float, t_out: float) -> None: self.tag = tag self.t_in = t_in self.t_out = t_out self.drawing = drawing def getAnimationSlice(self) -> AnimationSlice: return self.drawing.get_animation().getSlice(self.t_in, self.t_out) def get_as_svg(self): anim = self.getAnimationSlice() bb = anim.get_bounding_box() dwg = svgwrite.Drawing('/tmp/test.svg', size=(bb.width, bb.height)) anim.add_to_dwg(dwg) dwg.save() # TODO .... wip print('saved!') Filename = Union[str, bytes, PathLike[str], PathLike[bytes]] class Drawing: def __init__(self, filename: Filename, metadata_dir: Filename) -> None: self.eventfile = filename self.id = os.path.splitext(os.path.basename(self.eventfile))[0] self.metadata_fn = os.path.join(metadata_dir, f"{self.id}.json") def get_url(self) -> str: return f"/files/{self.id}" def get_annotations_url(self) -> str: return f"/annotations/{self.id}" def get_canvas_metadata(self) -> list: with open(self.eventfile, "r") as fp: first_line = fp.readline().strip() if first_line.endswith(","): first_line = first_line[:-1] data = json.loads(first_line) return { "date": data[0], "dimensions": { "width": data[1], "height": data[2], }, } def get_animation(self) -> AnimationSlice: # with open(self.eventfile, "r") as fp: strokes = [] with open(self.eventfile, "r") as fp: events = json.loads("[" + fp.read() + "]") for i, event in enumerate(events): if i == 0: # metadata on first line pass else: if event["event"] == "viewbox": pass if event["event"] == "stroke": # points = [] # for i in range(int(len(stroke) / 4)): # p = stroke[i*4:i*4+4] # points.append([float(p[0]), float(p[1]), int(p[2]), float(p[3])]) strokes.append( Stroke( event["color"], [Point.fromTuple(tuple(p)) for p in event["points"]], ) ) return AnimationSlice(strokes) def get_metadata(self): canvas = self.get_canvas_metadata() if os.path.exists(self.metadata_fn): with open(self.metadata_fn, "r") as fp: metadata = json.load(fp) else: metadata = {} metadata["canvas"] = canvas return metadata def get_absolute_viewbox(self) -> Viewbox: return self.get_animation().get_bounding_box() class Viewbox: def __init__(self, x: float, y: float, width: float, height: float): self.x = x self.y = y self.width = width self.height = height def __str__(self) -> str: return f"{self.x} {self.y} {self.width} {self.height}" FrameIndex = tuple[int, int] class AnimationSlice: # either a whole drawing or the result of applying an annotation to a drawing (an excerpt) # TODO rename to AnimationSlice to include audio as well def __init__( self, strokes: list[Stroke], t_in: float = 0, t_out: float = None ) -> None: self.strokes = strokes self.t_in = t_in self.t_out = t_out # TODO: Audio def get_bounding_box(self) -> Viewbox: min_x, max_x = float("inf"), float("-inf") min_y, max_y = float("inf"), float("-inf") for s in self.strokes: for p in s.points: if p.x < min_x: min_x = p.x if p.x > max_x: max_x = p.x if p.y < min_y: min_y = p.y if p.y > max_y: max_y = p.y return Viewbox(min_x, min_y, max_x - min_x, max_y - min_y) def getSlice(self, t_in: float, t_out: float) -> AnimationSlice: frame_in = self.getIndexForTime(t_in) frame_out = self.getIndexForTime(t_out) strokes = self.getStrokeSlices(frame_in, frame_out) return AnimationSlice(strokes, t_in, t_out) def add_to_dwg(self, dwg: SvgDrawing): group = svgwrite.container.Group() for stroke in self.strokes: stroke.add_to_dwg(group) def getStrokeSlices( self, index_in: FrameIndex, index_out: FrameIndex ) -> list[Stroke]: """Get list of Stroke/StrokeSlice based in in and out indexes Based on annotation.js getStrokesSliceForPathRange(in_point, out_point) """ slices = [] for i in range(index_in[0], index_out[0] + 1): try: stroke = self.strokes[i] except IndexError: # out point can be Infinity. So interrupt whenever the end is reached break in_i = index_in[1] if index_in[0] == i else 0 out_i = index_out[1] if index_out[0] == i else len(stroke.points) - 1 slices.append(StrokeSlice(stroke, in_i, out_i)) return slices def getIndexForTime(self, ms) -> FrameIndex: """Get the frame index (path, point) based on the given time Equal to annotations.js findPositionForTime(ms) """ path_i = 0 point_i = 0 for i, stroke in enumerate(self.strokes): start_at = stroke.points[0].t end_at = stroke.points[-1].t if start_at > ms: break # too far if end_at > ms: # we are getting close, find the right point_i path_i = i for pi, point in enumerate(stroke.points): if point.t: break # too far point_i = pi break # done :-) else: # in case this is our last path, stroe this as # best option thus far path_i = i point_i = len(stroke.points) - 1 return (path_i, point_i) class AudioSlice: def __init__(self, filename: Filename, t_in: float, t_out: float): self.filename = filename self.t_in = t_in # in ms self.t_out = t_out # in ms def export(self, format="mp3"): """Returns file descriptor of tempfile""" # Opening file and extracting segment song = AudioSegment.from_file(self.filename) extract = song[self.t_in : self.t_out] # Saving return extract.export(None, format=format) class AnnotationIndex: def __init__( self, filename: Filename, drawing_dir: Filename, metadata_dir: Filename ) -> None: self.filename = filename self.drawing_dir = drawing_dir self.metadata_dir = metadata_dir self.index = shelve.open(filename) def initiate(self): # reset the index for key in self.index: del self.index[key] self.index["_drawings"] = { d.id: d for d in [ Drawing(fn, self.metadata_dir) for fn in self.get_drawing_filenames() ] } drawing: Drawing for drawing in self.index['_drawings'].values(): meta = drawing.get_metadata() if 'annotations' not in meta: continue for ann in meta['annotations']: annotation = Annotation(ann['tag'], drawing, ann['t_in'], ann['t_out']) if annotation.tag not in self.index: self.index[annotation.tag] = [annotation] else: self.index[annotation.tag].append( annotation ) @property def drawings(self) -> dict[str, Drawing]: return self.index["_drawings"] def get(self, tag) -> list[Annotation]: if tag not in self.index: return [] return self.index[tag] def get_drawing_names(self) -> list[str]: return [ name[:-16] for name in os.listdir(self.drawing_dir) if name.endswith("json_appendable") ] def get_drawing_filenames(self) -> list[Filename]: return [ os.path.join(self.drawing_dir, f"{name}.json_appendable") for name in self.get_drawing_names() ] def __del__(self): self.index.close() # Point = tuple[float, float, float] class Point: def __init__(self, x: float, y: float, last: bool, t: float): self.x = x self.y = y self.last = last self.t = t @classmethod def fromTuple(cls, p: tuple[float, float, int, float]): return cls(p[0], p[1], bool(p[2]), p[3]) Points = list[Point] SvgDrawing = Union[svgwrite.container.SVG, svgwrite.container.Group] class Stroke: def __init__(self, color: str, points: Points) -> None: self.color = color self.points = points def add_to_dwg(self, dwg: SvgDrawing): path = svgwrite.path.Path(self.get_as_d).stroke(self.color,1).fill("none") dwg.add(path) def get_bounding_box(self) -> Viewbox: min_x, max_x = float("inf"), float("-inf") min_y, max_y = float("inf"), float("-inf") for p in self.points: if p.x < min_x: min_x = p.x if p.x > max_x: max_x = p.x if p.y < min_y: min_y = p.y if p.y > max_y: max_y = p.y return Viewbox(min_x, min_y, max_x - min_x, max_y - min_y) def get_as_d(self): d = "" prev_point = None cmd = "" for point in self.points: if not prev_point: # TODO multiply points by scalars for dimensions (height widht of drawing) d += f'M{point.x:.6},{point.y:.6} ' cmd = 'M' else: if prev_point.last: d += " m" cmd = "m" elif cmd != 'l': d += ' l ' cmd = 'l' diff_point = { "x": point.x - prev_point.x, "y": point.y - prev_point.y, } # TODO multiply points by scalars for dimensions (height widht of drawing) d += f'{diff_point.x:.6},{diff_point.y:.6} ' prev_point = point return d class StrokeSlice(Stroke): def __init__(self, stroke: Stroke, i_in: int = None, i_out: int = None) -> None: self.stroke = stroke self.i_in = 0 if i_in is None else i_in self.i_out = len(self.stroke.points) - 1 if i_out is None else i_out def slice_id(self): return f"{self.i_in}-{self.i_out}" @property def points(self) -> Points: return self.stroke.points[self.i_in : self.i_out + 1] @property def color(self) -> str: return self.stroke.color