from __future__ import annotations import asyncio import copy from ctypes.wintypes import tagMSG import json from os import X_OK, PathLike import os import subprocess from typing import Optional, Union import shelve from pydub import AudioSegment import svgwrite import tempfile import io import logging from anytree import NodeMixin, RenderTree, iterators from anytree.exporter import JsonExporter from anytree.importer import JsonImporter, DictImporter logger = logging.getLogger('svganim.strokes') Milliseconds = float Seconds = float class Annotation: def __init__(self, tag: str, drawing: Drawing, t_in: Milliseconds, t_out: Milliseconds, comment: str = None) -> None: self.tag = tag self.t_in = t_in self.t_out = t_out self.drawing = drawing self.comment = comment @property def id(self) -> str: return f'{self.drawing.id}:{self.tag}:{self.t_in}:{self.t_out}' def getAnimationSlice(self) -> AnimationSlice: return self.drawing.get_animation().getSlice(self.t_in, self.t_out) def get_as_svg(self) -> str: return self.getAnimationSlice().get_as_svg() def getJsonUrl(self) -> str: return self.drawing.get_url() + f"?t_in={self.t_in}&t_out={self.t_out}" Filename = Union[str, bytes, PathLike[str], PathLike[bytes]] SliceId = [str, float, float] class Drawing: def __init__(self, filename: Filename, metadata_dir: Filename, basedir: Filename) -> None: self.eventfile = filename self.id = os.path.splitext(os.path.basename(self.eventfile))[0] self.metadata_fn = os.path.join(metadata_dir, f"{self.id}.json") self.basedir = basedir def get_url(self) -> str: return f"/files/{self.id}" def get_annotations_url(self) -> str: return f"/annotations/{self.id}" def get_canvas_metadata(self) -> list: logger.info(f'metadata for {self.id}') with open(self.eventfile, "r") as fp: first_line = fp.readline().strip() if first_line.endswith(","): first_line = first_line[:-1] data = json.loads(first_line) return { "date": data[0], "dimensions": { "width": data[1], "height": data[2], }, } def get_audio(self) -> Optional[AudioSlice]: md = self.get_metadata() if 'audio' not in md: return None if 'file' not in md['audio']: return None return AudioSlice(filename=os.path.join(self.basedir, md['audio']['file'][1:]), drawing=self, offset=md['audio']['offset']*1000) def get_animation(self) -> AnimationSlice: # with open(self.eventfile, "r") as fp: strokes = [] viewboxes = [] with open(self.eventfile, "r") as fp: events = json.loads("[" + fp.read() + "]") for i, event in enumerate(events): if i == 0: # metadata on first line, add as initial viewbox to slice viewboxes.append(TimedViewbox(-float('Infinity'), 0, 0, event[1], event[2])) else: if type(event) is list: # ignore double metadatas, which appear when continuaing an existing drawing continue if event["event"] == "viewbox": viewboxes.extend([TimedViewbox( b['t'], b['x'], b['y'], b['width'], b['height']) for b in event['viewboxes']]) if event["event"] == "stroke": # points = [] # for i in range(int(len(stroke) / 4)): # p = stroke[i*4:i*4+4] # points.append([float(p[0]), float(p[1]), int(p[2]), float(p[3])]) strokes.append( Stroke( event["color"], [Point.fromTuple(tuple(p)) for p in event["points"]], ) ) return AnimationSlice([self.id, None, None], strokes, viewboxes, audioslice=self.get_audio()) def get_metadata(self): canvas = self.get_canvas_metadata() if os.path.exists(self.metadata_fn): with open(self.metadata_fn, "r") as fp: metadata = json.load(fp) else: metadata = {} metadata["canvas"] = canvas return metadata def get_absolute_viewbox(self) -> Viewbox: return self.get_animation().get_bounding_box() class Viewbox: def __init__(self, x: float, y: float, width: float, height: float): self.x = x self.y = y self.width = width self.height = height def __str__(self) -> str: return f"{self.x} {self.y} {self.width} {self.height}" class TimedViewbox(Viewbox): def __init__(self, time: Milliseconds, x: float, y: float, width: float, height: float): super().__init__(x, y, width, height) self.t = time FrameIndex = tuple[int, int] class AnimationSlice: # either a whole drawing or the result of applying an annotation to a drawing (an excerpt) # TODO rename to AnimationSlice to include audio as well def __init__( self, slice_id: SliceId, strokes: list[Stroke], viewboxes: list[TimedViewbox] = [], t_in: float = 0, t_out: float = None, audioslice: AudioSlice = None ) -> None: self.id = slice_id self.strokes = strokes self.viewboxes = viewboxes self.t_in = t_in self.t_out = t_out self.audio = audioslice # TODO: Audio def asDict(self) -> dict: """Can be used to json-ify the animation-slice """ # conversion necessary for when no t_in is given boxes = [v.__dict__ for v in self.viewboxes] for box in boxes: if box['t'] == -float('Infinity'): box['t'] = 0 drawing = { "file": self.getUrl(), "time": "-", # creation date # dimensions of drawing canvas "dimensions": [self.viewboxes[0].width, self.viewboxes[0].height], "shape": [s.asDict() for s in self.strokes], "viewboxes": boxes, "bounding_box": self.get_bounding_box().__dict__, "audio": self.getAudioDict() if self.audio else None } return drawing def getAudioDict(self): """quick and dirty to not use audio.asDict(), but it avoids passing all around sorts of data""" return { "file": '/files/' + self.getUrl('.mp3'), "offset": 0 # "offset": self.audio.offset / 1000 } def getUrl(self, extension = '') -> str: if not self.id[1] and not self.id[2]: return self.id[0] return self.id[0] + f"{extension}?t_in={self.t_in}&t_out={self.t_out}" def get_bounding_box(self, stroke_thickness: float = 3.5) -> Viewbox: """Stroke_thickness 3.5 == 1mm. If it should not be considered, just set it to 0. """ if len(self.strokes) == 0: # empty set return Viewbox(0,0,0,0) min_x, max_x = float("inf"), float("-inf") min_y, max_y = float("inf"), float("-inf") for s in self.strokes: for p in s.points: x1 = p.x - stroke_thickness/2 x2 = p.x + stroke_thickness/2 y1 = p.y - stroke_thickness/2 y2 = p.y + stroke_thickness/2 if x1 < min_x: min_x = x1 if x2 > max_x: max_x = x2 if y1 < min_y: min_y = y1 if y2 > max_y: max_y = y2 return Viewbox(min_x, min_y, max_x - min_x, max_y - min_y) def getSlice(self, t_in: Milliseconds, t_out: Milliseconds) -> AnimationSlice: """slice the slice. T in ms""" frame_in = self.getIndexForInPoint(t_in) frame_out = self.getIndexForOutPoint(t_out) strokes = self.getStrokeSlices(frame_in, frame_out, t_in) # TODO shift t of points with t_in viewboxes = self.getViewboxesSlice(t_in, t_out) audio = self.audio.getSlice(t_in, t_out) if self.audio else None return AnimationSlice([self.id[0], t_in, t_out], strokes, viewboxes, t_in, t_out, audio) def get_as_svg_dwg(self) -> svgwrite.Drawing: box = self.get_bounding_box() (_, fn) = tempfile.mkstemp(suffix='.svg', text=True) dwg = svgwrite.Drawing(fn, size=(box.width, box.height)) dwg.viewbox(box.x, box.y, box.width, box.height) self.add_to_dwg(dwg) dwg.defs.add( dwg.style("path{stroke-width:1mm;stroke-linecap: round;}")) return dwg def get_as_svg(self) -> str: dwg = self.get_as_svg_dwg() fp = io.StringIO() dwg.write(fp, pretty=True) return fp.getvalue() def add_to_dwg(self, dwg: SvgDrawing): group = svgwrite.container.Group() for stroke in self.strokes: stroke.add_to_dwg(group) dwg.add(group) def getViewboxesSlice(self, t_in: Milliseconds, t_out: Milliseconds) -> list[TimedViewbox]: """Extract the viewboxes for in- and outpoints. If there's one before inpoint, move that to the t_in, so that animation starts at the right position the slice is offset by t_in ms """ viewboxes = [] # Add single empty element, so that we can use viewboxes[0] later lastbox = None for viewbox in self.viewboxes: if viewbox.t > t_out: break if viewbox.t <= t_in: # make sure the first box is the last box from _before_ the slice firstbox = TimedViewbox( 0, viewbox.x, viewbox.y, viewbox.width, viewbox.height) if not len(viewboxes): viewboxes.append(firstbox) else: viewboxes[0] = firstbox continue viewboxes.append(TimedViewbox(viewbox.t-t_in, viewbox.x, viewbox.y, viewbox.width, viewbox.height)) return viewboxes def getStrokeSlices( self, index_in: FrameIndex, index_out: FrameIndex, t_offset: Seconds = 0 ) -> list[Stroke]: """Get list of Stroke/StrokeSlice based in in and out indexes Based on annotation.js getStrokesSliceForPathRange(in_point, out_point) If either in point or out point is [None, None], return an empty set. """ slices = [] if index_in[0] is None and index_in[1] is None: # If no inpoint is set, in_point is after the last stroke return slices if index_out[0] is None and index_out[1] is None: # If no out point is set, out_point is before the last stroke return slices for i in range(index_in[0], index_out[0] + 1): try: stroke = self.strokes[i] except IndexError: # out point can be Infinity. So interrupt whenever the end is reached break in_i = index_in[1] if index_in[0] == i else 0 out_i = index_out[1] if index_out[0] == i else len( stroke.points) - 1 slices.append(StrokeSlice(stroke, in_i, out_i, t_offset)) return slices def getIndexForInPoint(self, ms: Milliseconds) -> FrameIndex: """Get the frame index (path, point) based on the given time The In point version (so the first index after ms) Equal to annotations.js findPositionForTime(ms) """ path_i = None point_i = None for i, stroke in enumerate(self.strokes): start_at = stroke.points[0].t end_at = stroke.points[-1].t if end_at < ms: # certainly not the right point yet continue if start_at > ms: path_i = i point_i = 0 break # too far, so this is the first point after in point else: # our in-point is inbetween first and last of the stroke # we are getting close, find the right point_i path_i = i for pi, point in enumerate(stroke.points): point_i = pi if point.t > ms: break # stop when finding the next point after in point break # done :-) if path_i is None or point_i is None: logger.warn("in point after last stroke. Not sure if this works") pass return (path_i, point_i) def getIndexForOutPoint(self, ms: Milliseconds) -> FrameIndex: """Get the frame index (path, point) based on the given time The Out point version (so the last index before ms) Equal to annotations.js findPositionForTime(ms) """ return self.getIndexForTime(ms) def getIndexForTime(self, ms: Milliseconds) -> FrameIndex: """Get the frame index (path, point) based on the given time Equal to annotations.js findPositionForTime(ms) """ path_i = None point_i = None for i, stroke in enumerate(self.strokes): start_at = stroke.points[0].t end_at = stroke.points[-1].t if start_at > ms: break # too far if end_at > ms: # we are getting close, find the right point_i path_i = i for pi, point in enumerate(stroke.points): if point.t > ms: break # too far point_i = pi break # done :-) else: # in case this is our last path, stroe this as # best option thus far path_i = i point_i = len(stroke.points) - 1 if path_i is None or point_i is None: logger.warn("OUT point after last stroke. Not sure if this works") pass return (path_i, point_i) audiocache = {} class AudioSlice: def __init__(self, filename: Filename, drawing: Drawing, t_in: Milliseconds = None, t_out: Milliseconds = None, offset: Milliseconds = None): self.filename = filename self.drawing = drawing self.t_in = t_in # in ms self.t_out = t_out # in ms self.offset = offset # in ms TODO: use from self.drawing metadata def getSlice(self, t_in: float, t_out: float) -> AnimationSlice: return AudioSlice(self.filename, self.drawing, t_in, t_out, self.offset) def asDict(self): return { "file": self.getUrl(), # "offset": self.offset/1000 } def getUrl(self): fn = self.filename.replace("../files/audio", "/file/") params = [] if self.t_in: params.append(f"t_in={self.t_in}") if self.t_out: params.append(f"t_out={self.t_in}") if len(params): fn += "?" + "&".join(params) return fn async def export(self, format="mp3"): """Returns file descriptor of tempfile""" # Opening file and extracting segment start = int(self.t_in - self.offset) # millisecond precision is enough end = int(self.t_out - self.offset) # millisecond precision is enough # call ffmpeg directly, with given in and outpoint, so no unnecessary data is loaded, and no double conversion (e.g. ogg -> wav -> ogg ) is performed out_f = io.BytesIO() # build converter command to export conversion_command = [ "ffmpeg", '-ss', f"{start}ms", '-to', f"{end}ms", "-i", self.filename, # ss before input, so not whole file is loaded ] conversion_command.extend([ "-f", format, '-', # to stdout ]) # read stdin / write stdout logger.info("ffmpeg start") proc = await asyncio.create_subprocess_exec( *conversion_command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.DEVNULL) p_out, p_err = await proc.communicate() logger.info("ffmpeg finished") if proc.returncode != 0: raise Exception( "Encoding failed. ffmpeg/avlib returned error code: {0}\n\nCommand:{1}".format( p.returncode, conversion_command)) out_f.write(p_out) out_f.seek(0) return out_f # old way, use AudioSegment, easy but slow (reads whole ogg to wav, then export segment to ogg again) # logger.info("loading audio") # if self.filename in audiocache: # song = audiocache[self.filename] # else: # song = AudioSegment.from_file(self.filename) # audiocache[self.filename] = song # logger.info("loaded audio") # if start < 0 and end < 0: # extract = AudioSegment.silent( # duration=end-start, frame_rate=song.frame_rate) # else: # if start < 0: # preroll = AudioSegment.silent( # duration=start * -1, frame_rate=song.frame_rate) # start = 0 # else: # preroll = None # if end > len(song): # postroll = AudioSegment.silent( # duration=end - len(song), frame_rate=song.frame_rate) # end = len(song) - 1 # else: # postroll = None # extract = song[start: end] # if preroll: # extract = preroll + extract # if postroll: # extract += postroll # # Saving # return extract.export(None, format=format) class AnnotationIndex: def __init__( self, filename: Filename, drawing_dir: Filename, metadata_dir: Filename ) -> None: self.filename = filename self.drawing_dir = drawing_dir self.metadata_dir = metadata_dir self.root_tag = getRootTag() # disable disk cache because of glitches shelve.open(filename, writeback=True) self.shelve = {} def refresh(self): logger.info("refreshing") # reset the index for key in list(self.shelve.keys()): print(key) del self.shelve[key] self.shelve["_drawings"] = { d.id: d for d in [ Drawing(fn, self.metadata_dir, self.drawing_dir) for fn in self.get_drawing_filenames() ] } self.root_tag = getRootTag() self.shelve['_tags'] = {tag.id: [] for tag in self.root_tag.descendants} self.shelve['_annotations'] = {} drawing: Drawing for drawing in self.shelve['_drawings'].values(): meta = drawing.get_metadata() if 'annotations' not in meta: continue for ann in meta['annotations']: annotation = Annotation( ann['tag'], drawing, ann['t_in'], ann['t_out'], ann['comment'] if 'comment' in ann else "") self.shelve['_annotations'][annotation.id] = annotation if annotation.tag not in self.shelve['_tags']: self.shelve['_tags'][annotation.tag] = [annotation] else: self.shelve['_tags'][annotation.tag].append( annotation ) @property def drawings(self) -> dict[str, Drawing]: return self.shelve["_drawings"] @property def tags(self) -> dict[str, list[Annotation]]: return self.shelve["_tags"] @property def annotations(self) -> dict[str, Annotation]: return self.shelve["_annotations"] def has_tag(self, tag): return tag in self.tags def get_annotations_for_tag(self, tag_id) -> list[Annotation]: if tag_id not in self.tags: return [] return self.tags[tag_id] def get_drawing_names(self) -> list[str]: return [ name[:-16] for name in os.listdir(self.drawing_dir) if name.endswith("json_appendable") and os.stat(os.path.join(self.drawing_dir, name)).st_size > 0 ] def get_drawing_filenames(self) -> list[Filename]: return [ os.path.join(self.drawing_dir, f"{name}.json_appendable") for name in self.get_drawing_names() ] def get_nested_annotations_for_tag(self, tag_id) -> list[Annotation]: tag = self.root_tag.find_by_id(tag_id) annotations = [] for tag in tag.descendants_incl_self(): annotations.extend(self.get_annotations_for_tag(tag.id)) return annotations def __del__(self): self.shelve.close() # Point = tuple[float, float, float] class Point: def __init__(self, x: float, y: float, last: bool, t: Seconds): self.x = float(x) self.y = float(y) # if y == 0 it can still be integer.... odd python self.last = last self.t = t @classmethod def fromTuple(cls, p: tuple[float, float, int, float]): return cls(p[0], p[1], bool(p[2]), p[3]) def scaledToFit(self, dimensions: dict[str, float]) -> Point: # TODO: change so that it actually scales to FIT dimensions return Point(self.x, self.y, self.last, self.t) def asList(self) -> list: return [self.x, self.y, 1 if self.last else 0, self.t] Points = list[Point] SvgDrawing = Union[svgwrite.container.SVG, svgwrite.container.Group] class Stroke: def __init__(self, color: str, points: Points) -> None: self.color = color self.points = points def asDict(self) -> dict: return {"color": self.color, "points": [p.asList() for p in self.points]} def add_to_dwg(self, dwg: SvgDrawing): path = svgwrite.path.Path(d=self.get_as_d()).stroke( self.color, 1).fill("none") dwg.add(path) # def get_bounding_box(self) -> Viewbox: # min_x, max_x = float("inf"), float("-inf") # min_y, max_y = float("inf"), float("-inf") # for p in self.points: # if p.x < min_x: # min_x = p.x # if p.x > max_x: # max_x = p.x # if p.y < min_y: # min_y = p.y # if p.y > max_y: # max_y = p.y # return Viewbox(min_x, min_y, max_x - min_x, max_y - min_y) def get_as_d(self): d = "" prev_point = None cmd = "" for point in self.points: if not prev_point: # TODO multiply points by scalars for dimensions (height widht of drawing) d += f'M{point.x:.6},{point.y:.6} ' cmd = 'M' else: if prev_point.last: d += " m" cmd = "m" elif cmd != 'l': d += ' l ' cmd = 'l' diff_point = { "x": point.x - prev_point.x, "y": point.y - prev_point.y, } # TODO multiply points by scalars for dimensions (height widht of drawing) d += f'{diff_point["x"]:.6},{diff_point["y"]:.6} ' prev_point = point return d class StrokeSlice(Stroke): def __init__(self, stroke: Stroke, i_in: int = None, i_out: int = None, t_offset: Seconds = 0) -> None: self.stroke = stroke self.i_in = 0 if i_in is None else i_in self.i_out = len(self.stroke.points) - 1 if i_out is None else i_out # deepcopy points, because slices can be offset in time self.points = copy.deepcopy(self.stroke.points[self.i_in: self.i_out + 1]) for p in self.points: p.t -= t_offset def slice_id(self): return f"{self.i_in}-{self.i_out}" # @property # def points(self) -> Points: # return self.stroke.points[self.i_in: self.i_out + 1] @property def color(self) -> str: return self.stroke.color def strokes2D(strokes): # strokes to a d attribute for a path d = "" last_stroke = None cmd = "" for stroke in strokes: if not last_stroke: d += f"M{stroke[0]},{stroke[1]} " cmd = 'M' else: if last_stroke[2] == 1: d += " m" cmd = 'm' elif cmd != 'l': d += ' l ' cmd = 'l' rel_stroke = [stroke[0] - last_stroke[0], stroke[1] - last_stroke[1]] d += f"{rel_stroke[0]},{rel_stroke[1]} " last_stroke = stroke return d class Tag(NodeMixin): def __init__(self, id, name = None, description = "", color = None, parent=None, children=None): self.id = id self.name = self.id if name is None else name self.color = color self.description = description self.parent = parent if children: self.children = children if self.id == 'root' and not self.is_root: logger.error("Root node shouldn't have a parent assigned") def __repr__(self): return f"" def __str__(self): return RenderTree(self).by_attr('name') def get_color(self): if self.color is None and self.parent is not None: return self.parent.get_color() return self.color def descendants_incl_self(self): return tuple(iterators.PreOrderIter(self)) def find_by_id(self, tag_id) -> Optional[Tag]: for t in self.descendants: if t.id == tag_id: return t return None def toJson(self) -> str: return JsonExporter(indent=2).export(self) def loadTagFromJson(string) -> Tag: tree: Tag = JsonImporter(DictImporter(Tag)).import_(string) return tree def getRootTag(file = 'www/tags.json') -> Tag: with open(file, 'r') as fp: tree: Tag = JsonImporter(DictImporter(Tag)).read(fp) return tree # print(RenderTree(tree))