Feature: render to zmq and tag frame with index

This commit is contained in:
Ruben van de Ven 2023-12-06 10:24:45 +01:00
parent 104098d371
commit ec9bb357fd
6 changed files with 794 additions and 1142 deletions

1813
poetry.lock generated

File diff suppressed because it is too large Load diff

View file

@ -26,6 +26,9 @@ torchvision = [
] ]
deep-sort-realtime = "^1.3.2" deep-sort-realtime = "^1.3.2"
ultralytics = "^8.0.200" ultralytics = "^8.0.200"
ffmpeg-python = "^0.2.0"
torchreid = "^0.2.5"
gdown = "^4.7.1"
[build-system] [build-system]
requires = ["poetry-core"] requires = ["poetry-core"]

View file

@ -217,7 +217,17 @@ tracker_parser.add_argument("--detector",
# Renderer # Renderer
render_parser.add_argument("--render-preview", render_parser.add_argument("--render-file",
help="Render a video file previewing the prediction, and its delay compared to the current frame", help="Render a video file previewing the prediction, and its delay compared to the current frame",
action='store_true') action='store_true')
render_parser.add_argument("--render-url",
help="""Stream renderer on given URL. Two easy approaches:
- using zmq wrapper one can specify the LISTENING ip. To listen to any incoming connection: zmq:tcp://0.0.0.0:5556
- alternatively, using e.g. UDP one needs to specify the IP of the client. E.g. udp://100.69.123.91:5556/stream
Note that with ZMQ you can have multiple clients connecting simultaneously. E.g. using `ffplay zmq:tcp://100.109.175.82:5556`
When using udp, connecting can be done using `ffplay udp://100.109.175.82:5556/stream`
""",
type=str,
default=None)

View file

@ -3,10 +3,11 @@ from dataclasses import dataclass, field
from itertools import cycle from itertools import cycle
import logging import logging
from multiprocessing import Event from multiprocessing import Event
from pathlib import Path
import pickle import pickle
import sys import sys
import time import time
from typing import Optional from typing import Iterable, Optional
import numpy as np import numpy as np
import cv2 import cv2
import zmq import zmq
@ -15,6 +16,7 @@ logger = logging.getLogger('trap.frame_emitter')
@dataclass @dataclass
class Frame: class Frame:
index: int
img: np.array img: np.array
time: float= field(default_factory=lambda: time.time()) time: float= field(default_factory=lambda: time.time())
trajectories: Optional[dict] = None trajectories: Optional[dict] = None
@ -37,9 +39,9 @@ class FrameEmitter:
logger.info(f"Connection socket {config.zmq_frame_addr}") logger.info(f"Connection socket {config.zmq_frame_addr}")
if self.config.video_loop: if self.config.video_loop:
self.video_srcs = cycle(self.config.video_src) self.video_srcs: Iterable[Path] = cycle(self.config.video_src)
else: else:
self.video_srcs = self.config.video_src self.video_srcs: [Path] = self.config.video_src
def emit_video(self): def emit_video(self):
@ -51,6 +53,7 @@ class FrameEmitter:
logger.info(f"Emit frames at {fps} fps") logger.info(f"Emit frames at {fps} fps")
prev_time = time.time() prev_time = time.time()
i = 0
while self.is_running.is_set(): while self.is_running.is_set():
ret, img = video.read() ret, img = video.read()
@ -61,7 +64,13 @@ class FrameEmitter:
# video.set(cv2.CAP_PROP_POS_FRAMES, 0) # video.set(cv2.CAP_PROP_POS_FRAMES, 0)
# ret, img = video.read() # ret, img = video.read()
# assert ret is not False # not really error proof... # assert ret is not False # not really error proof...
frame = Frame(img=img)
if "DATASETS/hof/" in str(video_path):
# hack to mask out area
cv2.rectangle(img, (0,0), (800,200), (0,0,0), -1)
frame = Frame(index=i, img=img)
# TODO: this is very dirty, need to find another way. # TODO: this is very dirty, need to find another way.
# perhaps multiprocessing Array? # perhaps multiprocessing Array?
self.frame_sock.send(pickle.dumps(frame)) self.frame_sock.send(pickle.dumps(frame))
@ -74,11 +83,14 @@ class FrameEmitter:
new_frame_time += frame_duration - time_diff new_frame_time += frame_duration - time_diff
else: else:
prev_time = new_frame_time prev_time = new_frame_time
i += 1
if not self.is_running.is_set(): if not self.is_running.is_set():
# if not running, also break out of infinite generator loop # if not running, also break out of infinite generator loop
break break
logger.info("Stopping") logger.info("Stopping")

View file

@ -73,7 +73,7 @@ def start():
ExceptionHandlingProcess(target=run_tracker, kwargs={'config': args, 'is_running': isRunning}, name='tracker'), ExceptionHandlingProcess(target=run_tracker, kwargs={'config': args, 'is_running': isRunning}, name='tracker'),
] ]
if args.render_preview: if args.render_file or args.render_url:
procs.append( procs.append(
ExceptionHandlingProcess(target=run_renderer, kwargs={'config': args, 'is_running': isRunning}, name='renderer') ExceptionHandlingProcess(target=run_renderer, kwargs={'config': args, 'is_running': isRunning}, name='renderer')
) )

View file

@ -1,4 +1,4 @@
import ffmpeg
from argparse import Namespace from argparse import Namespace
import datetime import datetime
import logging import logging
@ -33,16 +33,48 @@ class Renderer:
self.inv_H = np.linalg.pinv(self.H) self.inv_H = np.linalg.pinv(self.H)
# TODO: get FPS from frame_emitter
# self.out = cv2.VideoWriter(str(filename), fourcc, 23.97, (1280,720))
self.fps = 10
self.frame_size = (1280,720)
self.out_writer = self.start_writer() if self.config.render_file else None
self.streaming_process = self.start_streaming() if self.config.render_url else None
def start_writer(self):
if not self.config.output_dir.exists(): if not self.config.output_dir.exists():
raise FileNotFoundError("Path does not exist") raise FileNotFoundError("Path does not exist")
date_str = datetime.datetime.now().isoformat(timespec="minutes") date_str = datetime.datetime.now().isoformat(timespec="minutes")
filename = self.config.output_dir / f"render_predictions-{date_str}.mp4" filename = self.config.output_dir / f"render_predictions-{date_str}-{self.config.detector}.mp4"
logger.info(f"Write to {filename}") logger.info(f"Write to {filename}")
fourcc = cv2.VideoWriter_fourcc(*'vp09') fourcc = cv2.VideoWriter_fourcc(*'vp09')
# TODO: get FPS from frame_emitter
self.out = cv2.VideoWriter(str(filename), fourcc, 23.97, (1280,720)) return cv2.VideoWriter(str(filename), fourcc, self.fps, self.frame_size)
def start_streaming(self):
return (
ffmpeg
.input('pipe:', format='rawvideo',codec="rawvideo", pix_fmt='bgr24', s='{}x{}'.format(*self.frame_size))
.output(
self.config.render_url,
#codec = "copy", # use same codecs of the original video
codec='libx264',
listen=1, # enables HTTP server
pix_fmt="yuv420p",
preset="ultrafast",
tune="zerolatency",
g=f"{self.fps*2}",
analyzeduration="2000000",
probesize="1000000",
f='mpegts'
)
.overwrite_output()
.run_async(pipe_stdin=True)
)
# return process
def run(self): def run(self):
@ -51,6 +83,12 @@ class Renderer:
first_time = None first_time = None
while self.is_running.is_set(): while self.is_running.is_set():
i+=1 i+=1
zmq_ev = self.frame_sock.poll(timeout=3)
if not zmq_ev:
# when no data comes in, loop so that is_running is checked
continue
frame: Frame = self.frame_sock.recv_pyobj() frame: Frame = self.frame_sock.recv_pyobj()
try: try:
predictions = self.prediction_sock.recv_json(zmq.NOBLOCK) predictions = self.prediction_sock.recv_json(zmq.NOBLOCK)
@ -75,40 +113,60 @@ class Renderer:
coords = cv2.perspectiveTransform(np.array([prediction['history']]), self.inv_H)[0] coords = cv2.perspectiveTransform(np.array([prediction['history']]), self.inv_H)[0]
# logger.warning(f"{coords=}") # logger.warning(f"{coords=}")
center = [int(p) for p in coords[-1]]
cv2.circle(img, center, 5, (0,255,0))
cv2.putText(img, track_id, (center[0]+8, center[1]), cv2.FONT_HERSHEY_SIMPLEX, fontScale=.8, color=(0,255,0))
for ci in range(1, len(coords)): for ci in range(1, len(coords)):
start = [int(p) for p in coords[ci-1]] start = [int(p) for p in coords[ci-1]]
end = [int(p) for p in coords[ci]] end = [int(p) for p in coords[ci]]
cv2.line(img, start, end, (255,255,255), 2) cv2.line(img, start, end, (255,255,255), 2, lineType=cv2.LINE_AA)
if not 'predictions' in prediction or not len(prediction['predictions']): if not 'predictions' in prediction or not len(prediction['predictions']):
continue continue
for pred in prediction['predictions']: for pred_i, pred in enumerate(prediction['predictions']):
pred_coords = cv2.perspectiveTransform(np.array([pred]), self.inv_H)[0] pred_coords = cv2.perspectiveTransform(np.array([pred]), self.inv_H)[0]
color = (0,0,255) if pred_i == 1 else (100,100,100)
for ci in range(1, len(pred_coords)): for ci in range(1, len(pred_coords)):
start = [int(p) for p in pred_coords[ci-1]] start = [int(p) for p in pred_coords[ci-1]]
end = [int(p) for p in pred_coords[ci]] end = [int(p) for p in pred_coords[ci]]
cv2.line(img, start, end, (0,0,255), 1) cv2.line(img, start, end, color, 1, lineType=cv2.LINE_AA)
for track_id, prediction in predictions.items():
# draw tracker marker and track id last so it lies over the trajectories
# this goes is a second loop so it overlays over _all_ trajectories
coords = cv2.perspectiveTransform(np.array([[prediction['history'][-1]]]), self.inv_H)[0]
center = [int(p) for p in coords[-1]]
cv2.circle(img, center, 5, (0,255,0))
p1 = (prediction['bbox'][0], prediction['bbox'][1])
p2 = (p1[0] + prediction['bbox'][2], p1[1] + prediction['bbox'][3])
cv2.rectangle(img, p1, p2, (255,0,0), 1)
cv2.putText(img, f"{track_id} ({(prediction['det_conf'] or 0):.2f})", (center[0]+8, center[1]), cv2.FONT_HERSHEY_SIMPLEX, fontScale=.7, thickness=2, color=(0,255,0), lineType=cv2.LINE_AA)
if first_time is None: if first_time is None:
first_time = frame.time first_time = frame.time
cv2.putText(img, f"{frame.time - first_time:.3f}s", (20,50), cv2.FONT_HERSHEY_PLAIN, 1, (255,255,0), 1) cv2.putText(img, f"{frame.index:06d}", (20,50), cv2.FONT_HERSHEY_PLAIN, 1, (255,255,0), 1)
cv2.putText(img, f"{frame.time - first_time:.3f}s", (100,50), cv2.FONT_HERSHEY_PLAIN, 1, (255,255,0), 1)
img_path = (self.config.output_dir / f"{i:05d}.png").resolve() img_path = (self.config.output_dir / f"{i:05d}.png").resolve()
# cv2.imwrite(str(img_path), img) # cv2.imwrite(str(img_path), img)
self.out.write(img) logger.info(f"write frame {frame.time - first_time:.3f}s")
if self.out_writer:
self.out_writer.write(img)
if self.streaming_process:
self.streaming_process.stdin.write(img.tobytes())
logger.info('Stopping') logger.info('Stopping')
if i>2: if i>2:
self.out.release() if self.streaming_process:
self.streaming_process.stdin.close()
if self.out_writer:
self.out_writer.release()
if self.streaming_process:
# oddly wrapped, because both close and release() take time.
self.streaming_process.wait()