diff --git a/trap/socket_forwarder.py b/trap/socket_forwarder.py index 87445f0..10a92ac 100644 --- a/trap/socket_forwarder.py +++ b/trap/socket_forwarder.py @@ -1,7 +1,9 @@ from argparse import Namespace import asyncio +import dataclasses import errno +import json import logging from multiprocessing import Event import subprocess @@ -15,6 +17,8 @@ import tornado.websocket import zmq import zmq.asyncio +from trap.frame_emitter import Frame + logger = logging.getLogger("trap.forwarder") @@ -112,12 +116,13 @@ class WsRouter: context = zmq.asyncio.Context() self.trajectory_socket = context.socket(zmq.PUB) - self.trajectory_socket.bind(config.zmq_prediction_addr if config.bypass_prediction else config.zmq_trajectory_addr) + self.trajectory_socket.bind(config.zmq_trajectory_addr) self.prediction_socket = context.socket(zmq.SUB) - self.prediction_socket.connect(config.zmq_prediction_addr) + self.prediction_socket.setsockopt(zmq.CONFLATE, 1) # only keep latest frame. NB. make sure this comes BEFORE connect, otherwise it's ignored!! self.prediction_socket.setsockopt(zmq.SUBSCRIBE, b'') - + self.prediction_socket.connect(config.zmq_prediction_addr if not self.config.bypass_prediction else config.zmq_trajectory_addr) + self.application = tornado.web.Application( [ ( @@ -166,11 +171,16 @@ class WsRouter: logger.info("Starting prediction forwarder") while self.is_running.is_set(): # timeout so that if no events occur, loop can still stop on is_running - has_event = await self.prediction_socket.poll(timeout=1) + has_event = await self.prediction_socket.poll(timeout=1000) if has_event: - msg = await self.prediction_socket.recv_string() - logger.debug(f"Forward prediction message of {len(msg)} chars") - WebSocketPredictionHandler.write_to_clients(msg) + try: + frame: Frame = await self.prediction_socket.recv_pyobj() + # tacks = [dataclasses.asdict(h) for t in frame.tracks.values() for t.history in t] + msg = json.dumps(frame.aslist()) + logger.debug(f"Forward prediction message of {len(msg)} chars") + WebSocketPredictionHandler.write_to_clients(msg) + except Exception as e: + logger.exception(e) # die together: self.evt_loop.stop()