trap/trap/frame_emitter.py

115 lines
4.4 KiB
Python

from __future__ import annotations
import logging
import multiprocessing
import pickle
from argparse import ArgumentParser, Namespace
from multiprocessing import Event
from pathlib import Path
import zmq
from trap import node
from trap.base import *
from trap.base import LambdaParser
from trap.timer import Timer
from trap.video_sources import get_video_source
logger = logging.getLogger('trap.frame_emitter')
class FrameEmitter(node.Node):
'''
Emit frame in a separate threat so they can be throttled,
or thrown away when the rest of the system cannot keep up
'''
def setup(self) -> None:
self.frame_sock = self.pub(self.config.zmq_frame_addr)
self.frame_noimg_sock = self.pub(self.config.zmq_frame_noimg_addr)
logger.info(f"Connection socket {self.config.zmq_frame_addr}")
logger.info(f"Connection socket {self.config.zmq_frame_noimg_addr}")
self.video_srcs = self.config.video_src
def run(self):
offset = int(self.config.video_offset or 0)
source = get_video_source(self.video_srcs, self.config.camera, offset, self.config.video_end, self.config.video_loop)
video_gen = enumerate(source, start = offset)
while self.run_loop():
try:
i, img = next(video_gen)
except StopIteration as e:
logger.info("Video source ended")
break
frame = Frame(i, img=img, H=self.config.camera.H, camera=self.config.camera)
# TODO: this is very dirty, need to find another way.
# perhaps multiprocessing Array?
self.frame_noimg_sock.send(pickle.dumps(frame.without_img()))
self.frame_sock.send(pickle.dumps(frame))
logger.info("Stopping")
@classmethod
def arg_parser(cls) -> ArgumentParser:
argparser = LambdaParser()
argparser.add_argument('--zmq-frame-addr',
help='Manually specity communication addr for the frame messages',
type=str,
default="ipc:///tmp/feeds_frame")
argparser.add_argument('--zmq-frame-noimg-addr',
help='Manually specity communication addr for the frame messages',
type=str,
default="ipc:///tmp/feeds_frame2")
argparser.add_argument("--video-src",
help="source video to track from can be either a relative or absolute path, or a url, like an RTSP resource, or use gige://RELATIVE_PATH_TO_GIGE_CONFIG_JSON",
type=UrlOrPath,
nargs='+',
default=lambda: [UrlOrPath(p) for p in Path('../DATASETS/VIRAT_subset_0102x/').glob('*.mp4')])
argparser.add_argument("--video-offset",
help="Start playback from given frame. Note that when src is an array, this applies to all videos individually.",
default=0,
type=int)
argparser.add_argument("--video-end",
help="End (or loop) playback at given frame.",
default=None,
type=int)
argparser.add_argument("--video-loop",
help="By default it emitter will run only once. This allows it to loop the video file to keep testing.",
action='store_true')
argparser.add_argument("--camera-fps",
help="Camera FPS",
type=int,
default=12)
argparser.add_argument("--homography",
help="File with homography params [Deprecated]",
type=Path,
default='../DATASETS/VIRAT_subset_0102x/VIRAT_0102_homography_img2world.txt',
action=HomographyAction)
argparser.add_argument("--calibration",
help="File with camera intrinsics and lens distortion params (calibration.json)",
# type=Path,
required=True,
# default=None,
action=CameraAction)
return argparser
def run_frame_emitter(config: Namespace, is_running: Event, timer_counter: int):
router = FrameEmitter(config, is_running)
router.run(timer_counter)
is_running.clear()