trap/trap/renderer.py

284 lines
12 KiB
Python

import time
import ffmpeg
from argparse import Namespace
import datetime
import logging
from multiprocessing import Event
from multiprocessing.synchronize import Event as BaseEvent
import cv2
import numpy as np
import zmq
import tempfile
from pathlib import Path
import shutil
from trap.frame_emitter import DetectionState, Frame
logger = logging.getLogger("trap.renderer")
class FrameWriter:
"""
Drop-in compatible interface with cv2.VideoWriter, but support variable
framerate.
See https://video.stackexchange.com/questions/25811/ffmpeg-make-video-with-non-constant-framerate-from-image-filenames
"""
def __init__(self, filename: str, fps: float, frame_size: tuple) -> None:
self.filename = filename
self.fps = fps
self.frame_size = frame_size
self.tmp_dir = tempfile.TemporaryDirectory(prefix="trap-output-")
self.i = 0
def write(self, img: cv2.typing.MatLike):
self.i += 1
cv2.imwrite(self.tmp_dir.name + f"/{self.i:07d}.png", img)
def release(self):
"""Actually write the video"""
# ffmpeg -f image2 -ts_from_file 2 -i %d.png out.mp4
logger.info(f"Write frames from {self.tmp_dir.name} to {self.filename}")
(
ffmpeg
# the magic here is in --ts_from_file which uses the mtime of the file for the interval
# this makes it possible to have non-constant intervals between frames, which is usefull
# since we render frames when we get them
.input(self.tmp_dir.name + "/%07d.png", format="image2", ts_from_file=2)
.output(self.filename, framerate=self.fps)
.run()
)
logger.info(f"Rm frame directory: {self.tmp_dir.name}")
self.tmp_dir.cleanup()
class Renderer:
def __init__(self, config: Namespace, is_running: BaseEvent):
self.config = config
self.is_running = is_running
context = zmq.Context()
self.prediction_sock = context.socket(zmq.SUB)
self.prediction_sock.setsockopt(zmq.CONFLATE, 1) # only keep latest frame. NB. make sure this comes BEFORE connect, otherwise it's ignored!!
self.prediction_sock.setsockopt(zmq.SUBSCRIBE, b'')
self.prediction_sock.connect(config.zmq_prediction_addr if not self.config.bypass_prediction else config.zmq_trajectory_addr)
self.frame_sock = context.socket(zmq.SUB)
self.frame_sock.setsockopt(zmq.CONFLATE, 1) # only keep latest frame. NB. make sure this comes BEFORE connect, otherwise it's ignored!!
self.frame_sock.setsockopt(zmq.SUBSCRIBE, b'')
self.frame_sock.connect(config.zmq_frame_addr)
self.H = np.loadtxt(self.config.homography, delimiter=',')
self.inv_H = np.linalg.pinv(self.H)
# TODO: get FPS from frame_emitter
# self.out = cv2.VideoWriter(str(filename), fourcc, 23.97, (1280,720))
self.fps = 10
self.frame_size = (1280,720)
self.out_writer = self.start_writer() if self.config.render_file else None
self.streaming_process = self.start_streaming() if self.config.render_url else None
def start_writer(self):
if not self.config.output_dir.exists():
raise FileNotFoundError("Path does not exist")
date_str = datetime.datetime.now().isoformat(timespec="minutes")
filename = self.config.output_dir / f"render_predictions-{date_str}-{self.config.detector}.mp4"
logger.info(f"Write to {filename}")
return FrameWriter(str(filename), self.fps, self.frame_size)
fourcc = cv2.VideoWriter_fourcc(*'vp09')
return cv2.VideoWriter(str(filename), fourcc, self.fps, self.frame_size)
def start_streaming(self):
return (
ffmpeg
.input('pipe:', format='rawvideo',codec="rawvideo", pix_fmt='bgr24', s='{}x{}'.format(*self.frame_size))
.output(
self.config.render_url,
#codec = "copy", # use same codecs of the original video
codec='libx264',
listen=1, # enables HTTP server
pix_fmt="yuv420p",
preset="ultrafast",
tune="zerolatency",
g=f"{self.fps*2}",
analyzeduration="2000000",
probesize="1000000",
f='mpegts'
)
.overwrite_output()
.run_async(pipe_stdin=True)
)
# return process
def run(self):
prediction_frame = None
i=0
first_time = None
while self.is_running.is_set():
i+=1
zmq_ev = self.frame_sock.poll(timeout=2000)
if not zmq_ev:
# when no data comes in, loop so that is_running is checked
continue
frame: Frame = self.frame_sock.recv_pyobj()
try:
prediction_frame: Frame = self.prediction_sock.recv_pyobj(zmq.NOBLOCK)
except zmq.ZMQError as e:
logger.debug(f'reuse prediction')
if first_time is None:
first_time = frame.time
decorate_frame(frame, prediction_frame, first_time, self.config)
img_path = (self.config.output_dir / f"{i:05d}.png").resolve()
# cv2.imwrite(str(img_path), img)
logger.debug(f"write frame {frame.time - first_time:.3f}s")
if self.out_writer:
self.out_writer.write(frame.img)
if self.streaming_process:
self.streaming_process.stdin.write(frame.img.tobytes())
logger.info('Stopping')
if i>2:
if self.streaming_process:
self.streaming_process.stdin.close()
if self.out_writer:
self.out_writer.release()
if self.streaming_process:
# oddly wrapped, because both close and release() take time.
self.streaming_process.wait()
# colorset = itertools.product([0,255], repeat=3) # but remove white
colorset = [(0, 0, 0),
(0, 0, 255),
(0, 255, 0),
(0, 255, 255),
(255, 0, 0),
(255, 0, 255),
(255, 255, 0)
]
def decorate_frame(frame: Frame, prediction_frame: Frame, first_time: float, config: Namespace) -> np.array:
frame.img
overlay = np.zeros(frame.img.shape, np.uint8)
# Fill image with red color(set each pixel to red)
overlay[:] = (130, 0, 75)
frame.img = cv2.addWeighted(frame.img, .4, overlay, .6, 0)
img = frame.img
# all not working:
# if i == 1:
# # thanks to GpG for fixing scaling issue: https://stackoverflow.com/a/39668864
# scale_factor = 1./20 # from 10m to 1000px
# S = np.array([[scale_factor, 0,0],[0,scale_factor,0 ],[ 0,0,1 ]])
# new_H = S * self.H * np.linalg.inv(S)
# warpedFrame = cv2.warpPerspective(img, new_H, (1000,1000))
# cv2.imwrite(str(self.config.output_dir / "orig.png"), warpedFrame)
cv2.rectangle(img, (0,0), (img.shape[1],25), (0,0,0), -1)
if not prediction_frame:
cv2.putText(img, f"Waiting for prediction...", (500,17), cv2.FONT_HERSHEY_PLAIN, 1, (255,255,0), 1)
# continue
else:
inv_H = np.linalg.pinv(prediction_frame.H)
for track_id, track in prediction_frame.tracks.items():
if not len(track.history):
continue
# coords = cv2.perspectiveTransform(np.array([prediction['history']]), self.inv_H)[0]
coords = [d.get_foot_coords() for d in track.history]
confirmations = [d.state == DetectionState.Confirmed for d in track.history]
# logger.warning(f"{coords=}")
for ci in range(1, len(coords)):
start = [int(p) for p in coords[ci-1]]
end = [int(p) for p in coords[ci]]
# color = (255,255,255) if confirmations[ci] else (100,100,100)
color = [100+155*ci/len(coords)]*3
cv2.line(img, start, end, color, 1, lineType=cv2.LINE_AA)
cv2.circle(img, end, 2, color, lineType=cv2.LINE_AA)
if not track.predictions or not len(track.predictions):
continue
color = colorset[track_id % len(colorset)]
for pred_i, pred in enumerate(track.predictions):
pred_coords = cv2.perspectiveTransform(np.array([pred]), inv_H)[0].tolist()
# color = (128,0,128) if pred_i else (128,128,0)
for ci in range(0, len(pred_coords)):
if ci == 0:
start = [int(p) for p in coords[-1]]
# start = [0,0]?
# print(start)
else:
start = [int(p) for p in pred_coords[ci-1]]
end = [int(p) for p in pred_coords[ci]]
cv2.line(img, start, end, color, 2, lineType=cv2.LINE_AA)
cv2.circle(img, end, 2, color, 1, lineType=cv2.LINE_AA)
for track_id, track in prediction_frame.tracks.items():
# draw tracker marker and track id last so it lies over the trajectories
# this goes is a second loop so it overlays over _all_ trajectories
# coords = cv2.perspectiveTransform(np.array([[track.history[-1].get_foot_coords()]]), self.inv_H)[0]
coords = track.history[-1].get_foot_coords()
color = colorset[track_id % len(colorset)]
center = [int(p) for p in coords]
cv2.circle(img, center, 6, (255,255,255), thickness=3)
(l, t, r, b) = track.history[-1].to_ltrb()
p1 = (l, t)
p2 = (r, b)
# cv2.rectangle(img, p1, p2, (255,0,0), 1)
cv2.putText(img, f"{track_id} ({(track.history[-1].conf or 0):.2f})", (center[0]+8, center[1]), cv2.FONT_HERSHEY_SIMPLEX, fontScale=.7, thickness=1, color=color, lineType=cv2.LINE_AA)
base_color = (255,)*3
info_color = (255,255,0)
cv2.putText(img, f"{frame.index:06d}", (20,17), cv2.FONT_HERSHEY_PLAIN, 1, base_color, 1)
cv2.putText(img, f"{frame.time - first_time:.3f}s", (120,17), cv2.FONT_HERSHEY_PLAIN, 1, base_color, 1)
if prediction_frame:
# render Δt and Δ frames
cv2.putText(img, f"{prediction_frame.index - frame.index}", (90,17), cv2.FONT_HERSHEY_PLAIN, 1, info_color, 1)
cv2.putText(img, f"{prediction_frame.time - time.time():.2f}s", (200,17), cv2.FONT_HERSHEY_PLAIN, 1, info_color, 1)
cv2.putText(img, f"{len(prediction_frame.tracks)} tracks", (500,17), cv2.FONT_HERSHEY_PLAIN, 1, base_color, 1)
cv2.putText(img, f"h: {np.average([len(t.history or []) for t in prediction_frame.tracks.values()]):.2f}", (580,17), cv2.FONT_HERSHEY_PLAIN, 1, info_color, 1)
cv2.putText(img, f"ph: {np.average([len(t.predictor_history or []) for t in prediction_frame.tracks.values()]):.2f}", (660,17), cv2.FONT_HERSHEY_PLAIN, 1, info_color, 1)
cv2.putText(img, f"p: {np.average([len(t.predictions or []) for t in prediction_frame.tracks.values()]):.2f}", (740,17), cv2.FONT_HERSHEY_PLAIN, 1, info_color, 1)
options = []
for option in ['prediction_horizon','num_samples','full_dist','gmm_mode','z_mode', 'model_dir']:
options.append(f"{option}: {config.__dict__[option]}")
cv2.putText(img, options.pop(-1), (20,img.shape[0]-30), cv2.FONT_HERSHEY_PLAIN, 1, base_color, 1)
cv2.putText(img, " | ".join(options), (20,img.shape[0]-10), cv2.FONT_HERSHEY_PLAIN, 1, base_color, 1)
return img
def run_renderer(config: Namespace, is_running: BaseEvent):
renderer = Renderer(config, is_running)
renderer.run()