trap/trap/frame_emitter.py
2023-10-17 09:45:37 +02:00

68 lines
No EOL
2.2 KiB
Python

from argparse import Namespace
from dataclasses import dataclass, field
import logging
from multiprocessing import Event
import pickle
import sys
import time
from typing import Optional
import numpy as np
import cv2
import zmq
logger = logging.getLogger('trap.frame_emitter')
@dataclass
class Frame:
img: np.array
time: float= field(default_factory=lambda: time.time())
trajectories: Optional[dict] = None
class FrameEmitter:
'''
Emit frame in a separate threat so they can be throttled,
or thrown away when the rest of the system cannot keep up
'''
def __init__(self, config: Namespace, is_running: Event) -> None:
self.config = config
self.is_running = is_running
context = zmq.Context()
self.frame_sock = context.socket(zmq.PUB)
self.frame_sock.setsockopt(zmq.CONFLATE, 1) # only keep latest frame. make sure to set BEFORE connect/bind
self.frame_sock.bind(config.zmq_frame_addr)
logger.info(f"Connection socket {config.zmq_frame_addr}")
def emit_video(self):
video = cv2.VideoCapture(str(self.config.video_src))
fps = video.get(cv2.CAP_PROP_FPS)
frame_duration = 1./fps
prev_time = time.time()
while self.is_running.is_set():
ret, img = video.read()
# seek to 0 if video has finished. Infinite loop
if not ret:
video.set(cv2.CAP_PROP_POS_FRAMES, 0)
ret, img = video.read()
assert ret is not False # not really error proof...
frame = Frame(img=img)
# TODO: this is very dirty, need to find another way.
# perhaps multiprocessing queue?
self.frame_sock.send(pickle.dumps(frame))
# defer next loop
new_frame_time = time.time()
time_diff = (new_frame_time - prev_time)
if time_diff < frame_duration:
time.sleep(frame_duration - time_diff)
new_frame_time += frame_duration - time_diff
else:
prev_time = new_frame_time
def run_frame_emitter(config: Namespace, is_running: Event):
router = FrameEmitter(config, is_running)
router.emit_video()