diff --git a/pyproject.toml b/pyproject.toml index 646625a..33066d3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ description = "Draw an animated vector image" authors = ["Ruben van de Ven "] [tool.poetry.dependencies] -python = "^3.9" +python = "^3.7" tornado = "^6.1" coloredlogs = "^15.0.1" pydub = "^0.25.1" diff --git a/svganim.service b/svganim.service new file mode 100644 index 0000000..e86264f --- /dev/null +++ b/svganim.service @@ -0,0 +1,11 @@ +[Unit] +Description=SVG animation interfaces +[Service] +ExecStart=/home/svganim/.poetry/bin/poetry webserver.py +WorkingDirectory=/home/svganim/svganim +User=svganim +Restart=on-failure +[Install] +WantedBy=multi-user.target + + diff --git a/svganim/strokes.py b/svganim/strokes.py index 03582c2..45a4396 100644 --- a/svganim/strokes.py +++ b/svganim/strokes.py @@ -2,11 +2,12 @@ from __future__ import annotations import json from os import X_OK, PathLike import os -from webserver import strokes2D from typing import Optional, Union import shelve from pydub import AudioSegment import svgwrite +import tempfile +import io class Annotation: @@ -16,28 +17,27 @@ class Annotation: self.t_out = t_out self.drawing = drawing + @property + def id(self) -> str: + return f'{self.drawing.id}:{self.tag}:{self.t_in}:{self.t_out}' + + def getAnimationSlice(self) -> AnimationSlice: return self.drawing.get_animation().getSlice(self.t_in, self.t_out) - def get_as_svg(self): - anim = self.getAnimationSlice() - bb = anim.get_bounding_box() - dwg = svgwrite.Drawing('/tmp/test.svg', size=(bb.width, bb.height)) - anim.add_to_dwg(dwg) - dwg.save() - # TODO .... wip - print('saved!') - + def get_as_svg(self) -> str: + return self.getAnimationSlice().get_as_svg() Filename = Union[str, bytes, PathLike[str], PathLike[bytes]] class Drawing: - def __init__(self, filename: Filename, metadata_dir: Filename) -> None: + def __init__(self, filename: Filename, metadata_dir: Filename, basedir: Filename) -> None: self.eventfile = filename self.id = os.path.splitext(os.path.basename(self.eventfile))[0] self.metadata_fn = os.path.join(metadata_dir, f"{self.id}.json") + self.basedir = basedir def get_url(self) -> str: return f"/files/{self.id}" @@ -60,6 +60,15 @@ class Drawing: }, } + def get_audio(self) -> Optional[AudioSlice]: + md = self.get_metadata() + if 'audio' not in md: + return None + if 'file' not in md['audio']: + return None + + return AudioSlice(filename=os.path.join(self.basedir,md['audio']['file'][1:]), offset=md['audio']['offset']*1000) + def get_animation(self) -> AnimationSlice: # with open(self.eventfile, "r") as fp: strokes = [] @@ -80,10 +89,10 @@ class Drawing: strokes.append( Stroke( event["color"], - [Point.fromTuple(tuple(p)) for p in event["points"]], + [Point.fromTuple(tuple(p)).scaled(self.get_canvas_metadata()['dimensions']) for p in event["points"]], ) ) - return AnimationSlice(strokes) + return AnimationSlice(strokes, audioslice=self.get_audio() ) def get_metadata(self): canvas = self.get_canvas_metadata() @@ -117,11 +126,12 @@ class AnimationSlice: # either a whole drawing or the result of applying an annotation to a drawing (an excerpt) # TODO rename to AnimationSlice to include audio as well def __init__( - self, strokes: list[Stroke], t_in: float = 0, t_out: float = None + self, strokes: list[Stroke], t_in: float = 0, t_out: float = None, audioslice: AudioSlice = None ) -> None: self.strokes = strokes self.t_in = t_in self.t_out = t_out + self.audio = audioslice # TODO: Audio def get_bounding_box(self) -> Viewbox: @@ -144,12 +154,29 @@ class AnimationSlice: frame_in = self.getIndexForTime(t_in) frame_out = self.getIndexForTime(t_out) strokes = self.getStrokeSlices(frame_in, frame_out) - return AnimationSlice(strokes, t_in, t_out) + audio = self.audio.getSlice(t_in, t_out) if self.audio else None + return AnimationSlice(strokes, t_in, t_out, audio) + + + def get_as_svg_dwg(self) -> svgwrite.Drawing: + box = self.get_bounding_box() + (_, fn) = tempfile.mkstemp(suffix='.svg', text=True) + dwg = svgwrite.Drawing(fn, size=(box.width, box.height)) + dwg.viewbox(box.x, box.y, box.width, box.height) + self.add_to_dwg(dwg) + return dwg + + def get_as_svg(self) -> str: + dwg = self.get_as_svg_dwg() + fp = io.StringIO() + dwg.write(fp, pretty=True) + return fp.getvalue() def add_to_dwg(self, dwg: SvgDrawing): group = svgwrite.container.Group() for stroke in self.strokes: stroke.add_to_dwg(group) + dwg.add(group) def getStrokeSlices( self, index_in: FrameIndex, index_out: FrameIndex @@ -200,16 +227,41 @@ class AnimationSlice: class AudioSlice: - def __init__(self, filename: Filename, t_in: float, t_out: float): + def __init__(self, filename: Filename, t_in: float=None, t_out: float=None, offset:float = None): self.filename = filename self.t_in = t_in # in ms self.t_out = t_out # in ms + self.offset = offset # in ms + + def getSlice(self, t_in: float, t_out: float) -> AnimationSlice: + return AudioSlice(self.filename, t_in, t_out, self.offset) def export(self, format="mp3"): """Returns file descriptor of tempfile""" # Opening file and extracting segment song = AudioSegment.from_file(self.filename) - extract = song[self.t_in : self.t_out] + start = self.t_in - self.offset + end = self.t_out - self.offset + + if start < 0 and end < 0: + extract = AudioSegment.silent(duration=end-start, frame_rate=song.frame_rate) + else: + if start < 0: + preroll = AudioSegment.silent(duration=start * -1, frame_rate=song.frame_rate) + start = 0 + else: + preroll = None + if end > len(song): + postroll = AudioSegment.silent(duration=end - len(song), frame_rate=song.frame_rate) + end = len(song) - 1 + else: + postroll = None + + extract = song[start: end] + if preroll: + extract = preroll + extract + if postroll: + extract += postroll # Saving return extract.export(None, format=format) @@ -223,42 +275,54 @@ class AnnotationIndex: self.drawing_dir = drawing_dir self.metadata_dir = metadata_dir - self.index = shelve.open(filename) + self.shelve = shelve.open(filename, writeback=True) - def initiate(self): + def refresh(self): # reset the index - for key in self.index: - del self.index[key] + for key in self.shelve: + del self.shelve[key] - self.index["_drawings"] = { + self.shelve["_drawings"] = { d.id: d for d in [ - Drawing(fn, self.metadata_dir) for fn in self.get_drawing_filenames() + Drawing(fn, self.metadata_dir, self.drawing_dir) for fn in self.get_drawing_filenames() ] } + self.shelve['_tags'] = {} + self.shelve['_annotations'] = {} drawing: Drawing - for drawing in self.index['_drawings'].values(): + for drawing in self.shelve['_drawings'].values(): meta = drawing.get_metadata() if 'annotations' not in meta: continue for ann in meta['annotations']: annotation = Annotation(ann['tag'], drawing, ann['t_in'], ann['t_out']) - if annotation.tag not in self.index: - self.index[annotation.tag] = [annotation] + self.shelve['_annotations'][annotation.id] = annotation + if annotation.tag not in self.shelve['_tags']: + self.shelve['_tags'][annotation.tag] = [annotation] else: - self.index[annotation.tag].append( + self.shelve['_tags'][annotation.tag].append( annotation ) + @property def drawings(self) -> dict[str, Drawing]: - return self.index["_drawings"] + return self.shelve["_drawings"] - def get(self, tag) -> list[Annotation]: - if tag not in self.index: + @property + def tags(self) -> dict[str, list[Annotation]]: + return self.shelve["_tags"] + + @property + def annotations(self) -> dict[str, Annotation]: + return self.shelve["_annotations"] + + def get_annotations(self, tag) -> list[Annotation]: + if tag not in self.tags: return [] - return self.index[tag] + return self.tags[tag] def get_drawing_names(self) -> list[str]: return [ @@ -274,7 +338,7 @@ class AnnotationIndex: ] def __del__(self): - self.index.close() + self.shelve.close() # Point = tuple[float, float, float] @@ -290,6 +354,9 @@ class Point: @classmethod def fromTuple(cls, p: tuple[float, float, int, float]): return cls(p[0], p[1], bool(p[2]), p[3]) + + def scaled(self, dimensions: dict[str, float]) -> Point: + return Point(self.x*dimensions['width'], self.y * dimensions['height'], self.last, self.t) Points = list[Point] @@ -301,7 +368,7 @@ class Stroke: self.points = points def add_to_dwg(self, dwg: SvgDrawing): - path = svgwrite.path.Path(self.get_as_d).stroke(self.color,1).fill("none") + path = svgwrite.path.Path(d=self.get_as_d()).stroke(self.color,1).fill("none") dwg.add(path) @@ -341,7 +408,7 @@ class Stroke: "y": point.y - prev_point.y, } # TODO multiply points by scalars for dimensions (height widht of drawing) - d += f'{diff_point.x:.6},{diff_point.y:.6} ' + d += f'{diff_point["x"]:.6},{diff_point["y"]:.6} ' prev_point = point return d @@ -362,3 +429,26 @@ class StrokeSlice(Stroke): @property def color(self) -> str: return self.stroke.color + + +def strokes2D(strokes): + # strokes to a d attribute for a path + d = ""; + last_stroke = None; + cmd = ""; + for stroke in strokes: + if not last_stroke: + d += f"M{stroke[0]},{stroke[1]} " + cmd = 'M' + else: + if last_stroke[2] == 1: + d += " m" + cmd = 'm' + elif cmd != 'l': + d+=' l ' + cmd = 'l' + + rel_stroke = [stroke[0] - last_stroke[0], stroke[1] - last_stroke[1]]; + d += f"{rel_stroke[0]},{rel_stroke[1]} " + last_stroke = stroke + return d \ No newline at end of file diff --git a/webserver.py b/webserver.py index 79d9bda..6853655 100644 --- a/webserver.py +++ b/webserver.py @@ -11,30 +11,34 @@ import html import argparse import coloredlogs import glob +import svganim.strokes +logger = logging.getLogger("svganim.webserver") class DateTimeEncoder(json.JSONEncoder): def default(self, o): if isinstance(o, datetime.datetime): - return o.isoformat(timespec='milliseconds') + return o.isoformat(timespec="milliseconds") return super().default(self, o) + class StaticFileWithHeaderHandler(tornado.web.StaticFileHandler): def set_extra_headers(self, path): """For subclass to add extra headers to the response""" - if path[-5:] == '.html': + if path[-5:] == ".html": self.set_header("Access-Control-Allow-Origin", "*") - if path[-4:] == '.svg': + if path[-4:] == ".svg": self.set_header("Content-Type", "image/svg+xml") class WebSocketHandler(tornado.websocket.WebSocketHandler): """ - Websocket from the workers + Websocket from the drawing """ + # CORS_ORIGINS = ['localhost'] connections = set() @@ -51,29 +55,39 @@ class WebSocketHandler(tornado.websocket.WebSocketHandler): # return valid # the client connected - def open(self, p = None): + def open(self, p=None): self.__class__.connections.add(self) - self.prefix = datetime.datetime.now().strftime('%Y-%m-%d-') - self.filename = self.prefix + str(self.check_filenr()) + '-' + uuid.uuid4().hex[:6] + self.prefix = datetime.datetime.now().strftime("%Y-%m-%d-") + self.filename = ( + self.prefix + str(self.check_filenr()) + "-" + uuid.uuid4().hex[:6] + ) logger.info(f"{self.filename=}") - self.write_message(json.dumps({ - "filename": self.filename - })) + self.write_message(json.dumps({"filename": self.filename})) def check_filenr(self): - files = glob.glob(os.path.join(self.config.storage, self.prefix +'*')) + files = glob.glob(os.path.join(self.config.storage, self.prefix + "*")) return len(files) + 1 - + def appendEvent(self, row): # write to an appendable json format. So basically a file that should be wrapped in [] to be json-parsable - with open(os.path.join(self.config.storage,self.filename +'.json_appendable'), 'a') as fp: + with open( + os.path.join(self.config.storage, self.filename + ".json_appendable"), "a" + ) as fp: if not self.hasWritten: - #metadata to first row, but only on demand - fp.write(json.dumps([datetime.datetime.now().strftime("%Y-%m-%d %T"), self.dimensions[0], self.dimensions[1]])) + # metadata to first row, but only on demand + fp.write( + json.dumps( + [ + datetime.datetime.now().strftime("%Y-%m-%d %T"), + self.dimensions[0], + self.dimensions[1], + ] + ) + ) # writer.writerow() self.hasWritten = True - fp.write(',\n') + fp.write(",\n") # first column is color, rest is points fp.write(json.dumps(row)) @@ -83,18 +97,18 @@ class WebSocketHandler(tornado.websocket.WebSocketHandler): try: msg = json.loads(message) - if msg['event'] == 'stroke': - logger.info('stroke') + if msg["event"] == "stroke": + logger.info("stroke") self.appendEvent(msg) - elif msg['event'] == 'dimensions': - self.dimensions = [int(msg['width']), int(msg['height'])] + elif msg["event"] == "dimensions": + self.dimensions = [int(msg["width"]), int(msg["height"])] logger.info(f"{self.dimensions=}") - elif msg['event'] == 'viewbox': - logger.info('move or resize') + elif msg["event"] == "viewbox": + logger.info("move or resize") self.appendEvent(msg) else: # self.send({'alert': 'Unknown request: {}'.format(message)}) - logger.warn('Unknown request: {}'.format(message)) + logger.warn("Unknown request: {}".format(message)) except Exception as e: # self.send({'alert': 'Invalid request: {}'.format(e)}) @@ -103,25 +117,24 @@ class WebSocketHandler(tornado.websocket.WebSocketHandler): # client disconnected def on_close(self): self.__class__.rmConnection(self) - + logger.info(f"Client disconnected: {self.request.remote_ip}") - @classmethod def rmConnection(cls, client): if client not in cls.connections: return cls.connections.remove(client) - + @classmethod def hasConnection(cls, client): return client in cls.connections - + class AudioListingHandler(tornado.web.RequestHandler): def initialize(self, config): self.config = config - self.audiodir = os.path.join(self.config.storage, 'audio') + self.audiodir = os.path.join(self.config.storage, "audio") def get(self): # filename = self.get_argument("file", None) @@ -129,10 +142,16 @@ class AudioListingHandler(tornado.web.RequestHandler): if not os.path.exists(self.audiodir): names = [] else: - names = sorted([f"/audio/{name}" for name in os.listdir(self.audiodir) if name not in ['.gitignore']]) + names = sorted( + [ + f"/audio/{name}" + for name in os.listdir(self.audiodir) + if name not in [".gitignore"] + ] + ) print(names) self.write(json.dumps(names)) - + class AnimationHandler(tornado.web.RequestHandler): def initialize(self, config): @@ -141,88 +160,149 @@ class AnimationHandler(tornado.web.RequestHandler): def get(self, filename): self.set_header("Content-Type", "application/json") # filename = self.get_argument("file", None) - if filename == '': + if filename == "": files = [] - names = [name for name in os.listdir(self.config.storage) if name.endswith('json_appendable')] + names = [ + name + for name in os.listdir(self.config.storage) + if name.endswith("json_appendable") + ] for name in names: - with open(os.path.join(self.config.storage, name), 'r') as fp: + with open(os.path.join(self.config.storage, name), "r") as fp: first_line = fp.readline().strip() - if first_line.endswith(','): + if first_line.endswith(","): first_line = first_line[:-1] print(first_line) metadata = json.loads(first_line) - files.append({ - 'name': f"/files/{name[:-16]}", - "time": metadata[0], - "dimensions": [metadata[1], metadata[2]], - }) - - files.sort(key=lambda k: k['time']) + files.append( + { + "name": f"/files/{name[:-16]}", + "time": metadata[0], + "dimensions": [metadata[1], metadata[2]], + } + ) + + files.sort(key=lambda k: k["time"]) self.write(json.dumps(files)) else: - path = os.path.join(self.config.storage,os.path.basename(filename)+".json_appendable") - drawing = { - "file": filename, - "shape": [] - } - with open(path, 'r') as fp: - events = json.loads('['+fp.read()+']') + path = os.path.join( + self.config.storage, os.path.basename(filename) + ".json_appendable" + ) + drawing = {"file": filename, "shape": []} + with open(path, "r") as fp: + events = json.loads("[" + fp.read() + "]") for i, event in enumerate(events): if i == 0: # metadata on first line - drawing['time'] = event[0] - drawing['dimensions'] = [event[1], event[2]] + drawing["time"] = event[0] + drawing["dimensions"] = [event[1], event[2]] else: - if event['event'] == 'viewbox': + if event["event"] == "viewbox": pass - if event['event'] == 'stroke': + if event["event"] == "stroke": # points = [] # for i in range(int(len(stroke) / 4)): # p = stroke[i*4:i*4+4] # points.append([float(p[0]), float(p[1]), int(p[2]), float(p[3])]) - drawing['shape'].append({ - 'color': event['color'], - 'points': event['points'] - }) + drawing["shape"].append( + {"color": event["color"], "points": event["points"]} + ) self.write(json.dumps(drawing)) -def strokes2D(strokes): - # strokes to a d attribute for a path - d = ""; - last_stroke = None; - cmd = ""; - for stroke in strokes: - if not last_stroke: - d += f"M{stroke[0]},{stroke[1]} " - cmd = 'M' + +class TagHandler(tornado.web.RequestHandler): + """List all tags""" + + def initialize(self, config, index: svganim.strokes.AnnotationIndex): + self.config = config + self.index = index + self.metadir = os.path.join(self.config.storage, "metadata") + + def get(self): + self.set_header("Content-Type", "application/json") + tags = self.index.tags.keys() + self.write(json.dumps(list(tags))) + + +class TagAnnotationsHandler(tornado.web.RequestHandler): + """List all annotations for given tag""" + + def initialize(self, config, index: svganim.strokes.AnnotationIndex): + self.config = config + self.index = index + self.metadir = os.path.join(self.config.storage, "metadata") + + def get(self, tag): + if tag not in self.index.tags: + raise tornado.web.HTTPError(404) + + self.set_header("Content-Type", "application/json") + annotations = self.index.tags[tag] + self.write(json.dumps(list([a.id for a in annotations]))) + + +class AnnotationHandler(tornado.web.RequestHandler): + """Get annotation as svg""" + + def initialize(self, config, index: svganim.strokes.AnnotationIndex): + self.config = config + self.index = index + self.metadir = os.path.join(self.config.storage, "metadata") + + def get(self, annotation_id): + if annotation_id[-4:] == ".svg": + extension = "svg" + annotation_id = annotation_id[:-4] + elif annotation_id[-4:] == ".mp3": + extension = "mp3" + annotation_id = annotation_id[:-4] + elif annotation_id[-4:] == ".wav": + extension = "wav" + annotation_id = annotation_id[:-4] else: - if last_stroke[2] == 1: - d += " m" - cmd = 'm' - elif cmd != 'l': - d+=' l ' - cmd = 'l' + extension = None - rel_stroke = [stroke[0] - last_stroke[0], stroke[1] - last_stroke[1]]; - d += f"{rel_stroke[0]},{rel_stroke[1]} " - last_stroke = stroke - return d + logger.info(f"annotation {annotation_id=}, {extension=}") + if annotation_id not in self.index.annotations: + raise tornado.web.HTTPError(404) + + annotation = self.index.annotations[annotation_id] + + if extension == "svg": + self.set_header("Content-Type", "image/svg+xml") + self.write(annotation.get_as_svg()) + elif extension == "mp3": + self.set_header("Content-Type", "audio/mp3") + self.write(annotation.getAnimationSlice().audio.export(format="mp3").read()) + elif extension == "wav": + self.set_header("Content-Type", "audio/wav") + self.write(annotation.getAnimationSlice().audio.export(format="wav").read()) + else: + self.set_header("Content-Type", "application/json") + self.write(json.dumps({ + "id": annotation.id, + "tag": annotation.tag, + "audio": f"/annotation/{annotation.id}.mp3", + })) - class AnnotationsHandler(tornado.web.RequestHandler): def initialize(self, config): self.config = config - self.metadir = os.path.join(self.config.storage, 'metadata') + self.metadir = os.path.join(self.config.storage, "metadata") def prepare(self): if self.request.headers.get("Content-Type", "").startswith("application/json"): self.json_args = json.loads(self.request.body) else: self.json_args = None - + def get_filenames(self): - return [name[:-16] for name in os.listdir(self.config.storage) if name.endswith('json_appendable')] + return [ + name[:-16] + for name in os.listdir(self.config.storage) + if name.endswith("json_appendable") + ] def get(self, filename): self.set_header("Content-Type", "application/json") @@ -233,71 +313,106 @@ class AnnotationsHandler(tornado.web.RequestHandler): if filename not in filenames: raise tornado.web.HTTPError(404) - - meta_file = os.path.join(self.metadir, filename +'.json') + meta_file = os.path.join(self.metadir, filename + ".json") if not os.path.exists(meta_file): self.set_status(404) return - with open(meta_file, 'r') as fp: + with open(meta_file, "r") as fp: self.write(json.load(fp)) def post(self, filename): - # filename = self.get_argument("file", None) + # filename = self.argument("file", None) filenames = self.get_filenames() print(filenames, filename) - + if filename not in filenames: raise tornado.web.HTTPError(404) if not os.path.exists(self.metadir): os.mkdir(self.metadir) - meta_file = os.path.join(self.metadir, filename +'.json') - with open(meta_file, 'w') as fp: + meta_file = os.path.join(self.metadir, filename + ".json") + with open(meta_file, "w") as fp: json.dump(self.json_args, fp) +class IndexHandler(tornado.web.RequestHandler): + """Get annotation as svg""" + + def initialize(self, config, index: svganim.strokes.AnnotationIndex): + self.config = config + self.index = index + + def get(self): + self.render("templates/index.html", index=self.index) class Server: """ Server for HIT -> plotter events As well as for the Status interface """ + loop = None def __init__(self, config, logger): self.config = config self.logger = logger - #self.config['server']['port'] - self.web_root = os.path.join('www') + # self.config['server']['port'] + self.web_root = os.path.join("www") + self.index = svganim.strokes.AnnotationIndex( + "annotation_index.shelve", "files", "files/metadata" + ) + self.logger.info("Loading Annotation Index") + self.index.refresh() + self.logger.info("\tloaded annotation index") def start(self): - application = tornado.web.Application([ - (r"/ws(.*)", WebSocketHandler, { - 'config': self.config, - }), - - (r"/files/(.*)", AnimationHandler, - {'config': self.config}), - (r"/audio/(.+)", tornado.web.StaticFileHandler, - {"path": os.path.join(self.config.storage, 'audio')}), - (r"/audio", AudioListingHandler, - {'config': self.config}), - (r"/annotations/(.+)", AnnotationsHandler, - {'config': self.config}), - (r"/(.*)", StaticFileWithHeaderHandler, - {"path": self.web_root}), - ], debug=True, autoreload=True) - application.listen(self.config.port) - tornado.ioloop.IOLoop.current().start() - + application = tornado.web.Application( + [ + ( + r"/ws(.*)", + WebSocketHandler, + { + "config": self.config, + }, + ), + (r"/files/(.*)", AnimationHandler, {"config": self.config}), + ( + r"/audio/(.+)", + tornado.web.StaticFileHandler, + {"path": os.path.join(self.config.storage, "audio")}, + ), + (r"/audio", AudioListingHandler, {"config": self.config}), + (r"/annotations/(.+)", AnnotationsHandler, {"config": self.config}), + (r"/tags", TagHandler, {"config": self.config, "index": self.index}), + ( + r"/tags/(.+)", + TagAnnotationsHandler, + {"config": self.config, "index": self.index}, + ), + ( + r"/annotation/(.+)", + AnnotationHandler, + {"config": self.config, "index": self.index}, + ), + (r"/(.+)", StaticFileWithHeaderHandler, {"path": self.web_root}), + + (r"/", IndexHandler, {"config": self.config, "index": self.index}), + ], + debug=True, + autoreload=True, + ) + application.listen(self.config.port) + tornado.ioloop.IOLoop.current().start() + if __name__ == "__main__": argParser = argparse.ArgumentParser( - description='Start up the vector animation server') + description="Start up the vector animation server" + ) # argParser.add_argument( # '--config', # '-c', @@ -305,50 +420,40 @@ if __name__ == "__main__": # type=str, # help='The yaml config file to load' # ) + argParser.add_argument("--port", type=int, default=7890, help="Port") argParser.add_argument( - '--port', - type=int, - default=7890, - help='Port' - ) - argParser.add_argument( - '--storage', - type=str, - default='files', - help='directory name for output files' - ) - argParser.add_argument( - '--verbose', - '-v', - action='count', default=0 + "--storage", type=str, default="files", help="directory name for output files" ) + argParser.add_argument("--verbose", "-v", action="count", default=0) args = argParser.parse_args() - loglevel = logging.NOTSET if args.verbose > 1 else logging.DEBUG if args.verbose > 0 else logging.INFO + loglevel = ( + logging.NOTSET + if args.verbose > 1 + else logging.DEBUG + if args.verbose > 0 + else logging.INFO + ) coloredlogs.install( level=loglevel, -# default: "%(asctime)s %(hostname)s %(name)s[%(process)d] %(levelname)s %(message)s" - fmt="%(asctime)s %(hostname)s %(name)s[%(process)d,%(threadName)s] %(levelname)s %(message)s" + # default: "%(asctime)s %(hostname)s %(name)s[%(process)d] %(levelname)s %(message)s" + fmt="%(asctime)s %(hostname)s %(name)s[%(process)d,%(threadName)s] %(levelname)s %(message)s", ) # File logging - formatter = logging.Formatter(fmt='%(asctime)s %(module)s:%(lineno)d %(levelname)8s | %(message)s', - datefmt='%Y/%m/%d %H:%M:%S') # %I:%M:%S %p AM|PM format + formatter = logging.Formatter( + fmt="%(asctime)s %(module)s:%(lineno)d %(levelname)8s | %(message)s", + datefmt="%Y/%m/%d %H:%M:%S", + ) # %I:%M:%S %p AM|PM format logFileHandler = logging.handlers.RotatingFileHandler( - 'log/draw_log.log', - maxBytes=1024*512, - backupCount=5 + "log/draw_log.log", maxBytes=1024 * 512, backupCount=5 ) logFileHandler.setFormatter(formatter) - logger = logging.getLogger("sorteerhoed") - logger.addHandler( - logFileHandler - ) + logger.addHandler(logFileHandler) logger.info(f"Start server: http://localhost:{args.port}") - + server = Server(args, logger) server.start() -