773 lines
26 KiB
Python
773 lines
26 KiB
Python
from __future__ import annotations
|
|
import asyncio
|
|
import copy
|
|
from ctypes.wintypes import tagMSG
|
|
import json
|
|
from os import X_OK, PathLike
|
|
import os
|
|
import subprocess
|
|
from typing import Optional, Union
|
|
import shelve
|
|
from pydub import AudioSegment
|
|
import svgwrite
|
|
import tempfile
|
|
import io
|
|
import logging
|
|
from anytree import NodeMixin, RenderTree, iterators
|
|
from anytree.exporter import JsonExporter, DictExporter
|
|
from anytree.importer import JsonImporter, DictImporter
|
|
|
|
logger = logging.getLogger('svganim.strokes')
|
|
|
|
Milliseconds = float
|
|
Seconds = float
|
|
|
|
class Annotation:
|
|
def __init__(self, tag: str, drawing: Drawing, t_in: Milliseconds, t_out: Milliseconds, comment: str = None) -> None:
|
|
self.tag = tag
|
|
self.t_in = t_in
|
|
self.t_out = t_out
|
|
self.drawing = drawing
|
|
self.comment = comment
|
|
|
|
@property
|
|
def id(self) -> str:
|
|
return f'{self.drawing.id}:{self.tag}:{self.t_in}:{self.t_out}'
|
|
|
|
def getAnimationSlice(self) -> AnimationSlice:
|
|
return self.drawing.get_animation().getSlice(self.t_in, self.t_out)
|
|
|
|
def get_as_svg(self) -> str:
|
|
return self.getAnimationSlice().get_as_svg()
|
|
|
|
def getJsonUrl(self) -> str:
|
|
return self.drawing.get_url() + f"?t_in={self.t_in}&t_out={self.t_out}"
|
|
|
|
|
|
Filename = Union[str, bytes, PathLike[str], PathLike[bytes]]
|
|
|
|
SliceId = [str, float, float]
|
|
|
|
|
|
class Drawing:
|
|
def __init__(self, filename: Filename, metadata_dir: Filename, basedir: Filename) -> None:
|
|
self.eventfile = filename
|
|
self.id = os.path.splitext(os.path.basename(self.eventfile))[0]
|
|
self.metadata_fn = os.path.join(metadata_dir, f"{self.id}.json")
|
|
self.basedir = basedir
|
|
|
|
def get_url(self) -> str:
|
|
return f"/files/{self.id}"
|
|
|
|
def get_annotations_url(self) -> str:
|
|
return f"/annotations/{self.id}"
|
|
|
|
def get_canvas_metadata(self) -> list:
|
|
logger.info(f'metadata for {self.id}')
|
|
with open(self.eventfile, "r") as fp:
|
|
first_line = fp.readline().strip()
|
|
|
|
if first_line.endswith(","):
|
|
first_line = first_line[:-1]
|
|
data = json.loads(first_line)
|
|
return {
|
|
"date": data[0],
|
|
"dimensions": {
|
|
"width": data[1],
|
|
"height": data[2],
|
|
},
|
|
}
|
|
|
|
def get_audio(self) -> Optional[AudioSlice]:
|
|
md = self.get_metadata()
|
|
if 'audio' not in md:
|
|
return None
|
|
if 'file' not in md['audio']:
|
|
return None
|
|
|
|
return AudioSlice(filename=os.path.join(self.basedir, md['audio']['file'][1:]), drawing=self, offset=md['audio']['offset']*1000)
|
|
|
|
def get_animation(self) -> AnimationSlice:
|
|
# with open(self.eventfile, "r") as fp:
|
|
strokes = []
|
|
viewboxes = []
|
|
with open(self.eventfile, "r") as fp:
|
|
events = json.loads("[" + fp.read() + "]")
|
|
for i, event in enumerate(events):
|
|
if i == 0:
|
|
# metadata on first line, add as initial viewbox to slice
|
|
viewboxes.append(TimedViewbox(-float('Infinity'), 0, 0, event[1], event[2]))
|
|
else:
|
|
if type(event) is list:
|
|
# ignore double metadatas, which appear when continuaing an existing drawing
|
|
continue
|
|
|
|
if event["event"] == "viewbox":
|
|
viewboxes.extend([TimedViewbox(
|
|
b['t'], b['x'], b['y'], b['width'], b['height']) for b in event['viewboxes']])
|
|
if event["event"] == "stroke":
|
|
# points = []
|
|
# for i in range(int(len(stroke) / 4)):
|
|
# p = stroke[i*4:i*4+4]
|
|
# points.append([float(p[0]), float(p[1]), int(p[2]), float(p[3])])
|
|
strokes.append(
|
|
Stroke(
|
|
event["color"],
|
|
[Point.fromTuple(tuple(p))
|
|
for p in event["points"]],
|
|
)
|
|
)
|
|
return AnimationSlice([self.id, None, None], strokes, viewboxes, audioslice=self.get_audio())
|
|
|
|
def get_metadata(self):
|
|
canvas = self.get_canvas_metadata()
|
|
if os.path.exists(self.metadata_fn):
|
|
with open(self.metadata_fn, "r") as fp:
|
|
metadata = json.load(fp)
|
|
else:
|
|
metadata = {}
|
|
metadata["canvas"] = canvas
|
|
return metadata
|
|
|
|
def get_absolute_viewbox(self) -> Viewbox:
|
|
return self.get_animation().get_bounding_box()
|
|
|
|
|
|
class Viewbox:
|
|
def __init__(self, x: float, y: float, width: float, height: float):
|
|
self.x = x
|
|
self.y = y
|
|
self.width = width
|
|
self.height = height
|
|
|
|
def __str__(self) -> str:
|
|
return f"{self.x} {self.y} {self.width} {self.height}"
|
|
|
|
|
|
class TimedViewbox(Viewbox):
|
|
def __init__(self, time: Milliseconds, x: float, y: float, width: float, height: float):
|
|
super().__init__(x, y, width, height)
|
|
self.t = time
|
|
|
|
|
|
FrameIndex = tuple[int, int]
|
|
|
|
|
|
class AnimationSlice:
|
|
# either a whole drawing or the result of applying an annotation to a drawing (an excerpt)
|
|
# TODO rename to AnimationSlice to include audio as well
|
|
def __init__(
|
|
self, slice_id: SliceId, strokes: list[Stroke], viewboxes: list[TimedViewbox] = [], t_in: float = 0, t_out: float = None, audioslice: AudioSlice = None
|
|
) -> None:
|
|
self.id = slice_id
|
|
self.strokes = strokes
|
|
self.viewboxes = viewboxes
|
|
self.t_in = t_in
|
|
self.t_out = t_out
|
|
self.audio = audioslice
|
|
# TODO: Audio
|
|
|
|
def asDict(self) -> dict:
|
|
"""Can be used to json-ify the animation-slice
|
|
"""
|
|
|
|
# conversion necessary for when no t_in is given
|
|
boxes = [v.__dict__ for v in self.viewboxes]
|
|
for box in boxes:
|
|
if box['t'] == -float('Infinity'):
|
|
box['t'] = 0
|
|
|
|
drawing = {
|
|
"file": self.getUrl(),
|
|
"time": "-", # creation date
|
|
# dimensions of drawing canvas
|
|
"dimensions": [self.viewboxes[0].width, self.viewboxes[0].height],
|
|
"shape": [s.asDict() for s in self.strokes],
|
|
"viewboxes": boxes,
|
|
"bounding_box": self.get_bounding_box().__dict__,
|
|
"audio": self.getAudioDict() if self.audio else None
|
|
}
|
|
return drawing
|
|
|
|
def getAudioDict(self):
|
|
"""quick and dirty to not use audio.asDict(), but it avoids passing all around sorts of data"""
|
|
return {
|
|
"file": '/files/' + self.getUrl('.mp3'),
|
|
"offset": 0
|
|
# "offset": self.audio.offset / 1000
|
|
}
|
|
|
|
def getUrl(self, extension = '') -> str:
|
|
if not self.id[1] and not self.id[2]:
|
|
return self.id[0]
|
|
|
|
return self.id[0] + f"{extension}?t_in={self.t_in}&t_out={self.t_out}"
|
|
|
|
def get_bounding_box(self, stroke_thickness: float = 3.5) -> Viewbox:
|
|
"""Stroke_thickness 3.5 == 1mm. If it should not be considered, just set it to 0.
|
|
"""
|
|
if len(self.strokes) == 0:
|
|
# empty set
|
|
return Viewbox(0,0,0,0)
|
|
|
|
min_x, max_x = float("inf"), float("-inf")
|
|
min_y, max_y = float("inf"), float("-inf")
|
|
|
|
for s in self.strokes:
|
|
for p in s.points:
|
|
|
|
x1 = p.x - stroke_thickness/2
|
|
x2 = p.x + stroke_thickness/2
|
|
y1 = p.y - stroke_thickness/2
|
|
y2 = p.y + stroke_thickness/2
|
|
if x1 < min_x:
|
|
min_x = x1
|
|
if x2 > max_x:
|
|
max_x = x2
|
|
if y1 < min_y:
|
|
min_y = y1
|
|
if y2 > max_y:
|
|
max_y = y2
|
|
return Viewbox(min_x, min_y, max_x - min_x, max_y - min_y)
|
|
|
|
def getSlice(self, t_in: Milliseconds, t_out: Milliseconds) -> AnimationSlice:
|
|
"""slice the slice. T in ms"""
|
|
frame_in = self.getIndexForInPoint(t_in)
|
|
frame_out = self.getIndexForOutPoint(t_out)
|
|
strokes = self.getStrokeSlices(frame_in, frame_out, t_in)
|
|
# TODO shift t of points with t_in
|
|
viewboxes = self.getViewboxesSlice(t_in, t_out)
|
|
|
|
audio = self.audio.getSlice(t_in, t_out) if self.audio else None
|
|
return AnimationSlice([self.id[0], t_in, t_out], strokes, viewboxes, t_in, t_out, audio)
|
|
|
|
def get_as_svg_dwg(self) -> svgwrite.Drawing:
|
|
box = self.get_bounding_box()
|
|
(_, fn) = tempfile.mkstemp(suffix='.svg', text=True)
|
|
dwg = svgwrite.Drawing(fn, size=(box.width, box.height))
|
|
dwg.viewbox(box.x, box.y, box.width, box.height)
|
|
self.add_to_dwg(dwg)
|
|
dwg.defs.add(
|
|
dwg.style("path{stroke-width:1mm;stroke-linecap: round;}"))
|
|
return dwg
|
|
|
|
def get_as_svg(self) -> str:
|
|
dwg = self.get_as_svg_dwg()
|
|
fp = io.StringIO()
|
|
dwg.write(fp, pretty=True)
|
|
return fp.getvalue()
|
|
|
|
def add_to_dwg(self, dwg: SvgDrawing):
|
|
group = svgwrite.container.Group()
|
|
for stroke in self.strokes:
|
|
stroke.add_to_dwg(group)
|
|
dwg.add(group)
|
|
|
|
def getViewboxesSlice(self, t_in: Milliseconds, t_out: Milliseconds) -> list[TimedViewbox]:
|
|
"""Extract the viewboxes for in- and outpoints.
|
|
If there's one before inpoint, move that to the t_in, so that animation starts at the right position
|
|
the slice is offset by t_in ms
|
|
"""
|
|
viewboxes = [] # Add single empty element, so that we can use viewboxes[0] later
|
|
lastbox = None
|
|
for viewbox in self.viewboxes:
|
|
if viewbox.t > t_out:
|
|
break
|
|
|
|
if viewbox.t <= t_in:
|
|
# make sure the first box is the last box from _before_ the slice
|
|
firstbox = TimedViewbox(
|
|
0, viewbox.x, viewbox.y, viewbox.width, viewbox.height)
|
|
if not len(viewboxes):
|
|
viewboxes.append(firstbox)
|
|
else:
|
|
viewboxes[0] = firstbox
|
|
continue
|
|
|
|
viewboxes.append(TimedViewbox(viewbox.t-t_in, viewbox.x, viewbox.y, viewbox.width, viewbox.height))
|
|
return viewboxes
|
|
|
|
def getStrokeSlices(
|
|
self, index_in: FrameIndex, index_out: FrameIndex, t_offset: Seconds = 0
|
|
) -> list[Stroke]:
|
|
"""Get list of Stroke/StrokeSlice based in in and out indexes
|
|
Based on annotation.js getStrokesSliceForPathRange(in_point, out_point)
|
|
If either in point or out point is [None, None], return an empty set.
|
|
"""
|
|
slices = []
|
|
if index_in[0] is None and index_in[1] is None:
|
|
# If no inpoint is set, in_point is after the last stroke
|
|
return slices
|
|
|
|
if index_out[0] is None and index_out[1] is None:
|
|
# If no out point is set, out_point is before the last stroke
|
|
return slices
|
|
|
|
for i in range(index_in[0], index_out[0] + 1):
|
|
try:
|
|
stroke = self.strokes[i]
|
|
except IndexError:
|
|
# out point can be Infinity. So interrupt whenever the end is reached
|
|
break
|
|
|
|
in_i = index_in[1] if index_in[0] == i else 0
|
|
out_i = index_out[1] if index_out[0] == i else len(
|
|
stroke.points) - 1
|
|
|
|
slices.append(StrokeSlice(stroke, in_i, out_i, t_offset))
|
|
return slices
|
|
|
|
def getIndexForInPoint(self, ms: Milliseconds) -> FrameIndex:
|
|
"""Get the frame index (path, point) based on the given time
|
|
The In point version (so the first index after ms)
|
|
Equal to annotations.js findPositionForTime(ms)
|
|
"""
|
|
path_i = None
|
|
point_i = None
|
|
for i, stroke in enumerate(self.strokes):
|
|
start_at = stroke.points[0].t
|
|
end_at = stroke.points[-1].t
|
|
if end_at < ms:
|
|
# certainly not the right point yet
|
|
continue
|
|
if start_at > ms:
|
|
path_i = i
|
|
point_i = 0
|
|
break # too far, so this is the first point after in point
|
|
else:
|
|
# our in-point is inbetween first and last of the stroke
|
|
# we are getting close, find the right point_i
|
|
path_i = i
|
|
for pi, point in enumerate(stroke.points):
|
|
point_i = pi
|
|
if point.t > ms:
|
|
break # stop when finding the next point after in point
|
|
break # done :-)
|
|
if path_i is None or point_i is None:
|
|
logger.warn("in point after last stroke. Not sure if this works")
|
|
pass
|
|
return (path_i, point_i)
|
|
|
|
def getIndexForOutPoint(self, ms: Milliseconds) -> FrameIndex:
|
|
"""Get the frame index (path, point) based on the given time
|
|
The Out point version (so the last index before ms)
|
|
Equal to annotations.js findPositionForTime(ms)
|
|
"""
|
|
return self.getIndexForTime(ms)
|
|
|
|
def getIndexForTime(self, ms: Milliseconds) -> FrameIndex:
|
|
"""Get the frame index (path, point) based on the given time
|
|
Equal to annotations.js findPositionForTime(ms)
|
|
"""
|
|
path_i = None
|
|
point_i = None
|
|
for i, stroke in enumerate(self.strokes):
|
|
start_at = stroke.points[0].t
|
|
end_at = stroke.points[-1].t
|
|
|
|
if start_at > ms:
|
|
break # too far
|
|
if end_at > ms:
|
|
# we are getting close, find the right point_i
|
|
path_i = i
|
|
for pi, point in enumerate(stroke.points):
|
|
if point.t > ms:
|
|
break # too far
|
|
point_i = pi
|
|
break # done :-)
|
|
else:
|
|
# in case this is our last path, stroe this as
|
|
# best option thus far
|
|
path_i = i
|
|
point_i = len(stroke.points) - 1
|
|
|
|
if path_i is None or point_i is None:
|
|
logger.warn("OUT point after last stroke. Not sure if this works")
|
|
pass
|
|
return (path_i, point_i)
|
|
|
|
audiocache = {}
|
|
|
|
class AudioSlice:
|
|
def __init__(self, filename: Filename, drawing: Drawing, t_in: Milliseconds = None, t_out: Milliseconds = None, offset: Milliseconds = None):
|
|
self.filename = filename
|
|
self.drawing = drawing
|
|
self.t_in = t_in # in ms
|
|
self.t_out = t_out # in ms
|
|
self.offset = offset # in ms TODO: use from self.drawing metadata
|
|
|
|
def getSlice(self, t_in: float, t_out: float) -> AnimationSlice:
|
|
return AudioSlice(self.filename, self.drawing, t_in, t_out, self.offset)
|
|
|
|
def asDict(self):
|
|
return {
|
|
"file": self.getUrl(),
|
|
# "offset": self.offset/1000
|
|
}
|
|
|
|
def getUrl(self):
|
|
fn = self.filename.replace("../files/audio", "/file/")
|
|
|
|
params = []
|
|
if self.t_in:
|
|
params.append(f"t_in={self.t_in}")
|
|
if self.t_out:
|
|
params.append(f"t_out={self.t_in}")
|
|
if len(params):
|
|
fn += "?" + "&".join(params)
|
|
return fn
|
|
|
|
async def export(self, format="mp3"):
|
|
"""Returns file descriptor of tempfile"""
|
|
# Opening file and extracting segment
|
|
start = int(self.t_in - self.offset) # millisecond precision is enough
|
|
end = int(self.t_out - self.offset) # millisecond precision is enough
|
|
|
|
# call ffmpeg directly, with given in and outpoint, so no unnecessary data is loaded, and no double conversion (e.g. ogg -> wav -> ogg ) is performed
|
|
out_f = io.BytesIO()
|
|
|
|
# build converter command to export
|
|
conversion_command = [
|
|
"ffmpeg",
|
|
'-ss', f"{start}ms",
|
|
'-to', f"{end}ms",
|
|
"-i", self.filename, # ss before input, so not whole file is loaded
|
|
]
|
|
|
|
conversion_command.extend([
|
|
"-f", format, '-', # to stdout
|
|
])
|
|
|
|
# read stdin / write stdout
|
|
logger.info("ffmpeg start")
|
|
proc = await asyncio.create_subprocess_exec(
|
|
*conversion_command,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.DEVNULL)
|
|
|
|
p_out, p_err = await proc.communicate()
|
|
|
|
logger.info("ffmpeg finished")
|
|
|
|
if proc.returncode != 0:
|
|
raise Exception(
|
|
"Encoding failed. ffmpeg/avlib returned error code: {0}\n\nCommand:{1}".format(
|
|
p.returncode, conversion_command))
|
|
|
|
out_f.write(p_out)
|
|
|
|
|
|
out_f.seek(0)
|
|
return out_f
|
|
|
|
# old way, use AudioSegment, easy but slow (reads whole ogg to wav, then export segment to ogg again)
|
|
# logger.info("loading audio")
|
|
# if self.filename in audiocache:
|
|
# song = audiocache[self.filename]
|
|
# else:
|
|
# song = AudioSegment.from_file(self.filename)
|
|
# audiocache[self.filename] = song
|
|
# logger.info("loaded audio")
|
|
|
|
# if start < 0 and end < 0:
|
|
# extract = AudioSegment.silent(
|
|
# duration=end-start, frame_rate=song.frame_rate)
|
|
# else:
|
|
# if start < 0:
|
|
# preroll = AudioSegment.silent(
|
|
# duration=start * -1, frame_rate=song.frame_rate)
|
|
# start = 0
|
|
# else:
|
|
# preroll = None
|
|
# if end > len(song):
|
|
# postroll = AudioSegment.silent(
|
|
# duration=end - len(song), frame_rate=song.frame_rate)
|
|
# end = len(song) - 1
|
|
# else:
|
|
# postroll = None
|
|
|
|
# extract = song[start: end]
|
|
# if preroll:
|
|
# extract = preroll + extract
|
|
# if postroll:
|
|
# extract += postroll
|
|
|
|
# # Saving
|
|
# return extract.export(None, format=format)
|
|
|
|
|
|
class AnnotationIndex:
|
|
def __init__(
|
|
self, filename: Filename, drawing_dir: Filename, metadata_dir: Filename
|
|
) -> None:
|
|
self.filename = filename
|
|
self.drawing_dir = drawing_dir
|
|
self.metadata_dir = metadata_dir
|
|
|
|
self.root_tag = getRootTag()
|
|
|
|
# disable disk cache because of glitches shelve.open(filename, writeback=True)
|
|
self.shelve = {}
|
|
|
|
def refresh(self):
|
|
logger.info("refreshing")
|
|
# reset the index
|
|
for key in list(self.shelve.keys()):
|
|
print(key)
|
|
del self.shelve[key]
|
|
|
|
self.shelve["_drawings"] = {
|
|
d.id: d
|
|
for d in [
|
|
Drawing(fn, self.metadata_dir, self.drawing_dir) for fn in self.get_drawing_filenames()
|
|
]
|
|
}
|
|
|
|
self.root_tag = getRootTag()
|
|
self.shelve['_tags'] = {tag.id: [] for tag in self.root_tag.descendants}
|
|
self.shelve['_annotations'] = {}
|
|
|
|
drawing: Drawing
|
|
for drawing in self.shelve['_drawings'].values():
|
|
meta = drawing.get_metadata()
|
|
if 'annotations' not in meta:
|
|
continue
|
|
for ann in meta['annotations']:
|
|
annotation = Annotation(
|
|
ann['tag'], drawing, ann['t_in'], ann['t_out'], ann['comment'] if 'comment' in ann else "")
|
|
self.shelve['_annotations'][annotation.id] = annotation
|
|
if annotation.tag not in self.shelve['_tags']:
|
|
self.shelve['_tags'][annotation.tag] = [annotation]
|
|
logger.error(f"Use of non-existing tag {annotation.tag}")
|
|
else:
|
|
self.shelve['_tags'][annotation.tag].append(
|
|
annotation
|
|
)
|
|
tag = self.root_tag.find_by_id(annotation.tag)
|
|
if tag is not None:
|
|
tag.annotation_count += 1
|
|
|
|
|
|
@property
|
|
def drawings(self) -> dict[str, Drawing]:
|
|
return self.shelve["_drawings"]
|
|
|
|
@property
|
|
def tags(self) -> dict[str, list[Annotation]]:
|
|
return self.shelve["_tags"]
|
|
|
|
@property
|
|
def annotations(self) -> dict[str, Annotation]:
|
|
return self.shelve["_annotations"]
|
|
|
|
def has_tag(self, tag):
|
|
return tag in self.tags
|
|
|
|
def get_annotations_for_tag(self, tag_id) -> list[Annotation]:
|
|
if tag_id not in self.tags:
|
|
return []
|
|
return self.tags[tag_id]
|
|
|
|
def get_drawing_names(self) -> list[str]:
|
|
return [
|
|
name[:-16]
|
|
for name in os.listdir(self.drawing_dir)
|
|
if name.endswith("json_appendable") and os.stat(os.path.join(self.drawing_dir, name)).st_size > 0
|
|
]
|
|
|
|
def get_drawing_filenames(self) -> list[Filename]:
|
|
return [
|
|
os.path.join(self.drawing_dir, f"{name}.json_appendable")
|
|
for name in self.get_drawing_names()
|
|
]
|
|
|
|
def get_nested_annotations_for_tag(self, tag_id) -> list[Annotation]:
|
|
tag = self.root_tag.find_by_id(tag_id)
|
|
annotations = []
|
|
for tag in tag.descendants_incl_self():
|
|
annotations.extend(self.get_annotations_for_tag(tag.id))
|
|
return annotations
|
|
|
|
|
|
def __del__(self):
|
|
self.shelve.close()
|
|
|
|
|
|
# Point = tuple[float, float, float]
|
|
|
|
|
|
class Point:
|
|
def __init__(self, x: float, y: float, last: bool, t: Seconds):
|
|
self.x = float(x)
|
|
self.y = float(y) # if y == 0 it can still be integer.... odd python
|
|
self.last = last
|
|
self.t = t
|
|
|
|
@classmethod
|
|
def fromTuple(cls, p: tuple[float, float, int, float]):
|
|
return cls(p[0], p[1], bool(p[2]), p[3])
|
|
|
|
def scaledToFit(self, dimensions: dict[str, float]) -> Point:
|
|
# TODO: change so that it actually scales to FIT dimensions
|
|
return Point(self.x, self.y, self.last, self.t)
|
|
|
|
def asList(self) -> list:
|
|
return [self.x, self.y, 1 if self.last else 0, self.t]
|
|
|
|
|
|
Points = list[Point]
|
|
SvgDrawing = Union[svgwrite.container.SVG, svgwrite.container.Group]
|
|
|
|
|
|
class Stroke:
|
|
def __init__(self, color: str, points: Points) -> None:
|
|
self.color = color
|
|
self.points = points
|
|
|
|
def asDict(self) -> dict:
|
|
return {"color": self.color, "points": [p.asList() for p in self.points]}
|
|
|
|
def add_to_dwg(self, dwg: SvgDrawing):
|
|
path = svgwrite.path.Path(d=self.get_as_d()).stroke(
|
|
self.color, 1).fill("none")
|
|
dwg.add(path)
|
|
|
|
# def get_bounding_box(self) -> Viewbox:
|
|
# min_x, max_x = float("inf"), float("-inf")
|
|
# min_y, max_y = float("inf"), float("-inf")
|
|
|
|
# for p in self.points:
|
|
# if p.x < min_x:
|
|
# min_x = p.x
|
|
# if p.x > max_x:
|
|
# max_x = p.x
|
|
# if p.y < min_y:
|
|
# min_y = p.y
|
|
# if p.y > max_y:
|
|
# max_y = p.y
|
|
# return Viewbox(min_x, min_y, max_x - min_x, max_y - min_y)
|
|
|
|
def get_as_d(self):
|
|
d = ""
|
|
prev_point = None
|
|
cmd = ""
|
|
for point in self.points:
|
|
if not prev_point:
|
|
# TODO multiply points by scalars for dimensions (height widht of drawing)
|
|
d += f'M{point.x:.6},{point.y:.6} '
|
|
cmd = 'M'
|
|
else:
|
|
if prev_point.last:
|
|
d += " m"
|
|
cmd = "m"
|
|
elif cmd != 'l':
|
|
d += ' l '
|
|
cmd = 'l'
|
|
diff_point = {
|
|
"x": point.x - prev_point.x,
|
|
"y": point.y - prev_point.y,
|
|
}
|
|
# TODO multiply points by scalars for dimensions (height widht of drawing)
|
|
d += f'{diff_point["x"]:.6},{diff_point["y"]:.6} '
|
|
prev_point = point
|
|
return d
|
|
|
|
|
|
class StrokeSlice(Stroke):
|
|
def __init__(self, stroke: Stroke, i_in: int = None, i_out: int = None, t_offset: Seconds = 0) -> None:
|
|
self.stroke = stroke
|
|
self.i_in = 0 if i_in is None else i_in
|
|
self.i_out = len(self.stroke.points) - 1 if i_out is None else i_out
|
|
|
|
# deepcopy points, because slices can be offset in time
|
|
self.points = copy.deepcopy(self.stroke.points[self.i_in: self.i_out + 1])
|
|
for p in self.points:
|
|
p.t -= t_offset
|
|
|
|
def slice_id(self):
|
|
return f"{self.i_in}-{self.i_out}"
|
|
|
|
# @property
|
|
# def points(self) -> Points:
|
|
# return self.stroke.points[self.i_in: self.i_out + 1]
|
|
|
|
@property
|
|
def color(self) -> str:
|
|
return self.stroke.color
|
|
|
|
|
|
def strokes2D(strokes):
|
|
# strokes to a d attribute for a path
|
|
d = ""
|
|
last_stroke = None
|
|
cmd = ""
|
|
for stroke in strokes:
|
|
if not last_stroke:
|
|
d += f"M{stroke[0]},{stroke[1]} "
|
|
cmd = 'M'
|
|
else:
|
|
if last_stroke[2] == 1:
|
|
d += " m"
|
|
cmd = 'm'
|
|
elif cmd != 'l':
|
|
d += ' l '
|
|
cmd = 'l'
|
|
|
|
rel_stroke = [stroke[0] - last_stroke[0],
|
|
stroke[1] - last_stroke[1]]
|
|
d += f"{rel_stroke[0]},{rel_stroke[1]} "
|
|
last_stroke = stroke
|
|
return d
|
|
|
|
class Tag(NodeMixin):
|
|
def __init__(self, id, name = None, description = "", color = None, parent=None, children=None, annotation_count=None):
|
|
self.id = id
|
|
self.name = self.id if name is None else name
|
|
self.color = color
|
|
self.description = description
|
|
self.parent = parent
|
|
if children:
|
|
self.children = children
|
|
|
|
self.annotation_count = 0 #always zero!
|
|
|
|
if self.id == 'root' and not self.is_root:
|
|
logger.error("Root node shouldn't have a parent assigned")
|
|
|
|
def __repr__(self):
|
|
return f"<svganim.strokes.Tag {self.id}>"
|
|
|
|
def __str__(self):
|
|
return RenderTree(self).by_attr('name')
|
|
|
|
def get_color(self):
|
|
if self.color is None and self.parent is not None:
|
|
return self.parent.get_color()
|
|
return self.color
|
|
|
|
def descendants_incl_self(self):
|
|
return tuple(iterators.PreOrderIter(self))
|
|
|
|
def find_by_id(self, tag_id) -> Optional[Tag]:
|
|
for t in self.descendants:
|
|
if t.id == tag_id:
|
|
return t
|
|
return None
|
|
|
|
def toJson(self, with_counts=False) -> str:
|
|
ignore_counts=lambda attrs: [(k, v) for k, v in attrs if k != "annotation_count"]
|
|
attrFilter = None if with_counts else ignore_counts
|
|
|
|
return JsonExporter(DictExporter(attriter=attrFilter), indent=2).export(self)
|
|
|
|
def loadTagFromJson(string) -> Tag:
|
|
tree: Tag = JsonImporter(DictImporter(Tag)).import_(string)
|
|
return tree
|
|
|
|
def getRootTag(file = 'www/tags.json') -> Tag:
|
|
with open(file, 'r') as fp:
|
|
tree: Tag = JsonImporter(DictImporter(Tag)).read(fp)
|
|
|
|
return tree
|
|
|
|
# print(RenderTree(tree))
|