419 lines
No EOL
20 KiB
Python
419 lines
No EOL
20 KiB
Python
# adapted from Trajectron++ online_server.py
|
||
from argparse import Namespace
|
||
import logging
|
||
from multiprocessing import Event, Queue
|
||
import os
|
||
import pickle
|
||
import sys
|
||
import time
|
||
import json
|
||
import traceback
|
||
import warnings
|
||
import pandas as pd
|
||
import torch
|
||
import dill
|
||
import random
|
||
import pathlib
|
||
import numpy as np
|
||
from trajectron.environment.data_utils import derivative_of
|
||
from trajectron.utils import prediction_output_to_trajectories
|
||
from trajectron.model.online.online_trajectron import OnlineTrajectron
|
||
from trajectron.model.model_registrar import ModelRegistrar
|
||
from trajectron.environment import Environment, Scene
|
||
from trajectron.environment.node import Node
|
||
from trajectron.environment.node_type import NodeType
|
||
import matplotlib.pyplot as plt
|
||
|
||
import zmq
|
||
|
||
from trap.frame_emitter import Frame
|
||
from trap.tracker import Track, Smoother
|
||
|
||
logger = logging.getLogger("trap.prediction")
|
||
|
||
|
||
# if not torch.cuda.is_available() or self.config.device == 'cpu':
|
||
# self.config.device = torch.device('cpu')
|
||
# else:
|
||
# if torch.cuda.device_count() == 1:
|
||
# # If you have CUDA_VISIBLE_DEVICES set, which you should,
|
||
# # then this will prevent leftover flag arguments from
|
||
# # messing with the device allocation.
|
||
# self.config.device = 'cuda:0'
|
||
|
||
# self.config.device = torch.device(self.config.device)
|
||
|
||
|
||
|
||
def create_online_env(env, hyperparams, scene_idx, init_timestep):
|
||
test_scene = env.scenes[scene_idx]
|
||
|
||
online_scene = Scene(timesteps=init_timestep + 1,
|
||
map=test_scene.map,
|
||
dt=test_scene.dt)
|
||
online_scene.nodes = test_scene.get_nodes_clipped_at_time(
|
||
timesteps=np.arange(init_timestep - hyperparams['maximum_history_length'],
|
||
init_timestep + 1),
|
||
state=hyperparams['state'])
|
||
online_scene.robot = test_scene.robot
|
||
online_scene.calculate_scene_graph(attention_radius=env.attention_radius,
|
||
edge_addition_filter=hyperparams['edge_addition_filter'],
|
||
edge_removal_filter=hyperparams['edge_removal_filter'])
|
||
|
||
return Environment(node_type_list=env.node_type_list,
|
||
standardization=env.standardization,
|
||
scenes=[online_scene],
|
||
attention_radius=env.attention_radius,
|
||
robot_type=env.robot_type)
|
||
|
||
|
||
def get_maps_for_input(input_dict, scene, hyperparams):
|
||
scene_maps = list()
|
||
scene_pts = list()
|
||
heading_angles = list()
|
||
patch_sizes = list()
|
||
nodes_with_maps = list()
|
||
for node in input_dict:
|
||
if node.type in hyperparams['map_encoder']:
|
||
x = input_dict[node]
|
||
me_hyp = hyperparams['map_encoder'][node.type]
|
||
if 'heading_state_index' in me_hyp:
|
||
heading_state_index = me_hyp['heading_state_index']
|
||
# We have to rotate the map in the opposit direction of the agent to match them
|
||
if type(heading_state_index) is list: # infer from velocity or heading vector
|
||
heading_angle = -np.arctan2(x[-1, heading_state_index[1]],
|
||
x[-1, heading_state_index[0]]) * 180 / np.pi
|
||
else:
|
||
heading_angle = -x[-1, heading_state_index] * 180 / np.pi
|
||
else:
|
||
heading_angle = None
|
||
|
||
scene_map = scene.map[node.type]
|
||
map_point = x[-1, :2]
|
||
|
||
patch_size = hyperparams['map_encoder'][node.type]['patch_size']
|
||
|
||
scene_maps.append(scene_map)
|
||
scene_pts.append(map_point)
|
||
heading_angles.append(heading_angle)
|
||
patch_sizes.append(patch_size)
|
||
nodes_with_maps.append(node)
|
||
|
||
if heading_angles[0] is None:
|
||
heading_angles = None
|
||
else:
|
||
heading_angles = torch.Tensor(heading_angles)
|
||
|
||
maps = scene_maps[0].get_cropped_maps_from_scene_map_batch(scene_maps,
|
||
scene_pts=torch.Tensor(scene_pts),
|
||
patch_size=patch_sizes[0],
|
||
rotation=heading_angles)
|
||
|
||
maps_dict = {node: maps[[i]] for i, node in enumerate(nodes_with_maps)}
|
||
return maps_dict
|
||
|
||
|
||
class PredictionServer:
|
||
def __init__(self, config: Namespace, is_running: Event):
|
||
self.config = config
|
||
self.is_running = is_running
|
||
|
||
if self.config.eval_device == 'cpu':
|
||
logger.warning("Running on CPU. Specifying --eval_device cuda:0 should dramatically speed up prediction")
|
||
|
||
if self.config.smooth_predictions:
|
||
self.smoother = Smoother(window_len=4)
|
||
|
||
context = zmq.Context()
|
||
self.trajectory_socket: zmq.Socket = context.socket(zmq.SUB)
|
||
self.trajectory_socket.setsockopt(zmq.SUBSCRIBE, b'')
|
||
self.trajectory_socket.setsockopt(zmq.CONFLATE, 1) # only keep last msg. Set BEFORE connect!
|
||
self.trajectory_socket.connect(config.zmq_trajectory_addr)
|
||
|
||
self.prediction_socket: zmq.Socket = context.socket(zmq.PUB)
|
||
self.prediction_socket.bind(config.zmq_prediction_addr)
|
||
# print(self.prediction_socket)
|
||
|
||
def run(self):
|
||
|
||
if self.config.seed is not None:
|
||
random.seed(self.config.seed)
|
||
np.random.seed(self.config.seed)
|
||
torch.manual_seed(self.config.seed)
|
||
if torch.cuda.is_available():
|
||
torch.cuda.manual_seed_all(self.config.seed)
|
||
|
||
# Choose one of the model directory names under the experiment/*/models folders.
|
||
# Possibilities are 'vel_ee', 'int_ee', 'int_ee_me', or 'robot'
|
||
# model_dir = os.path.join(self.config.log_dir, 'int_ee')
|
||
# model_dir = 'models/models_04_Oct_2023_21_04_48_eth_vel_ar3'
|
||
|
||
# Load hyperparameters from json
|
||
config_file = os.path.join(self.config.model_dir, self.config.conf)
|
||
if not os.path.exists(config_file):
|
||
raise ValueError('Config json not found!')
|
||
with open(config_file, 'r') as conf_json:
|
||
hyperparams = json.load(conf_json)
|
||
|
||
# Add hyperparams from arguments
|
||
hyperparams['dynamic_edges'] = self.config.dynamic_edges
|
||
hyperparams['edge_state_combine_method'] = self.config.edge_state_combine_method
|
||
hyperparams['edge_influence_combine_method'] = self.config.edge_influence_combine_method
|
||
hyperparams['edge_addition_filter'] = self.config.edge_addition_filter
|
||
hyperparams['edge_removal_filter'] = self.config.edge_removal_filter
|
||
hyperparams['batch_size'] = self.config.batch_size
|
||
hyperparams['k_eval'] = self.config.k_eval
|
||
hyperparams['offline_scene_graph'] = self.config.offline_scene_graph
|
||
hyperparams['incl_robot_node'] = self.config.incl_robot_node
|
||
hyperparams['edge_encoding'] = not self.config.no_edge_encoding
|
||
hyperparams['use_map_encoding'] = self.config.map_encoding
|
||
# hyperparams['maximum_history_length'] = 12 # test
|
||
|
||
logger.info(f"Use hyperparams: {hyperparams=}")
|
||
|
||
output_save_dir = os.path.join(self.config.output_dir, 'pred_figs')
|
||
pathlib.Path(output_save_dir).mkdir(parents=True, exist_ok=True)
|
||
|
||
|
||
with open(self.config.eval_data_dict, 'rb') as f:
|
||
eval_env = dill.load(f, encoding='latin1')
|
||
|
||
if eval_env.robot_type is None and hyperparams['incl_robot_node']:
|
||
eval_env.robot_type = eval_env.NodeType[0] # TODO: Make more general, allow the user to specify?
|
||
for scene in eval_env.scenes:
|
||
scene.add_robot_from_nodes(eval_env.robot_type)
|
||
|
||
logger.info('Loaded data from %s' % (self.config.eval_data_dict,))
|
||
|
||
# Creating a dummy environment with a single scene that contains information about the world.
|
||
# When using this code, feel free to use whichever scene index or initial timestep you wish.
|
||
scene_idx = 0
|
||
|
||
# You need to have at least acceleration, so you want 2 timesteps of prior data, e.g. [0, 1],
|
||
# so that you can immediately start incremental inference from the 3rd timestep onwards.
|
||
init_timestep = 2
|
||
|
||
eval_scene = eval_env.scenes[scene_idx]
|
||
online_env = create_online_env(eval_env, hyperparams, scene_idx, init_timestep)
|
||
|
||
# auto-find highest iteration
|
||
model_registrar = ModelRegistrar(self.config.model_dir, self.config.eval_device)
|
||
model_iterations = pathlib.Path(self.config.model_dir).glob('model_registrar-*.pt')
|
||
highest_iter = max([int(p.stem.split('-')[-1]) for p in model_iterations])
|
||
|
||
model_registrar.load_models(iter_num=highest_iter)
|
||
|
||
trajectron = OnlineTrajectron(model_registrar,
|
||
hyperparams,
|
||
self.config.eval_device)
|
||
|
||
# If you want to see what different robot futures do to the predictions, uncomment this line as well as
|
||
# related "... += adjustment" lines below.
|
||
# adjustment = np.stack([np.arange(13)/float(i*2.0) for i in range(6, 12)], axis=1)
|
||
|
||
# Here's how you'd incrementally run the model, e.g. with streaming data.
|
||
trajectron.set_environment(online_env, init_timestep)
|
||
|
||
timestep = init_timestep + 1
|
||
prev_run_time = 0
|
||
while self.is_running.is_set():
|
||
timestep += 1
|
||
|
||
# this_run_time = time.time()
|
||
# logger.debug(f'test {prev_run_time - this_run_time}')
|
||
# time.sleep(max(0, prev_run_time - this_run_time + .5))
|
||
# prev_run_time = time.time()
|
||
|
||
|
||
# TODO: see process_data.py on how to create a node, the provide nodes + incoming data columns
|
||
# data_columns = pd.MultiIndex.from_product([['position', 'velocity', 'acceleration'], ['x', 'y']])
|
||
# x = node_values[:, 0]
|
||
# y = node_values[:, 1]
|
||
# vx = derivative_of(x, scene.dt)
|
||
# vy = derivative_of(y, scene.dt)
|
||
# ax = derivative_of(vx, scene.dt)
|
||
# ay = derivative_of(vy, scene.dt)
|
||
|
||
# data_dict = {('position', 'x'): x,
|
||
# ('position', 'y'): y,
|
||
# ('velocity', 'x'): vx,
|
||
# ('velocity', 'y'): vy,
|
||
# ('acceleration', 'x'): ax,
|
||
# ('acceleration', 'y'): ay}
|
||
|
||
# node_data = pd.DataFrame(data_dict, columns=data_columns)
|
||
# node = Node(node_type=env.NodeType.PEDESTRIAN, node_id=node_id, data=node_data)
|
||
if self.config.predict_training_data:
|
||
input_dict = eval_scene.get_clipped_input_dict(timestep, hyperparams['state'])
|
||
else:
|
||
zmq_ev = self.trajectory_socket.poll(timeout=2000)
|
||
if not zmq_ev:
|
||
# on no data loop so that is_running is checked
|
||
continue
|
||
|
||
data = self.trajectory_socket.recv()
|
||
frame: Frame = pickle.loads(data)
|
||
# trajectory_data = {t.track_id: t.get_projected_history_as_dict(frame.H) for t in frame.tracks.values()}
|
||
# trajectory_data = json.loads(data)
|
||
# logger.debug(f"Receive {frame.index}")
|
||
|
||
# class FakeNode:
|
||
# def __init__(self, node_type: NodeType):
|
||
# self.type = node_type
|
||
|
||
input_dict = {}
|
||
for identifier, track in frame.tracks.items():
|
||
# if len(trajectory['history']) < 7:
|
||
# # TODO: these trajectories should still be in the output, but without predictions
|
||
# continue
|
||
|
||
# TODO: modify this into a mapping function between JS data an the expected Node format
|
||
# node = FakeNode(online_env.NodeType.PEDESTRIAN)
|
||
history = [[h['x'], h['y']] for h in track.get_projected_history_as_dict(frame.H, self.config.camera)]
|
||
history = np.array(history)
|
||
x = history[:, 0]
|
||
y = history[:, 1]
|
||
# TODO: calculate dt based on input
|
||
vx = derivative_of(x, 0.1) #eval_scene.dt
|
||
vy = derivative_of(y, 0.1)
|
||
ax = derivative_of(vx, 0.1)
|
||
ay = derivative_of(vy, 0.1)
|
||
|
||
data_dict = {('position', 'x'): x[:], # [-10:-1]
|
||
('position', 'y'): y[:], # [-10:-1]
|
||
('velocity', 'x'): vx[:], # [-10:-1]
|
||
('velocity', 'y'): vy[:], # [-10:-1]
|
||
('acceleration', 'x'): ax[:], # [-10:-1]
|
||
('acceleration', 'y'): ay[:]} # [-10:-1]
|
||
data_columns = pd.MultiIndex.from_product([['position', 'velocity', 'acceleration'], ['x', 'y']])
|
||
|
||
node_data = pd.DataFrame(data_dict, columns=data_columns)
|
||
node = Node(
|
||
node_type=online_env.NodeType.PEDESTRIAN,
|
||
node_id=identifier,
|
||
data=node_data,
|
||
first_timestep=timestep
|
||
)
|
||
|
||
input_dict[node] = np.array([x[-1],y[-1],vx[-1],vy[-1],ax[-1],ay[-1]])
|
||
|
||
# print(input_dict)
|
||
|
||
if not len(input_dict):
|
||
# skip if our input is empty
|
||
# TODO: we want to send out empty result...
|
||
# And want to update the network
|
||
|
||
# data = json.dumps({})
|
||
self.prediction_socket.send_pyobj(frame)
|
||
|
||
continue
|
||
|
||
maps = None
|
||
if hyperparams['use_map_encoding']:
|
||
maps = get_maps_for_input(input_dict, eval_scene, hyperparams)
|
||
# print(maps)
|
||
|
||
# robot_present_and_future = None
|
||
# if eval_scene.robot is not None and hyperparams['incl_robot_node']:
|
||
# robot_present_and_future = eval_scene.robot.get(np.array([timestep,
|
||
# timestep + hyperparams['prediction_horizon']]),
|
||
# hyperparams['state'][eval_scene.robot.type],
|
||
# padding=0.0)
|
||
# robot_present_and_future = np.stack([robot_present_and_future, robot_present_and_future], axis=0)
|
||
# # robot_present_and_future += adjustment
|
||
|
||
start = time.time()
|
||
with warnings.catch_warnings():
|
||
warnings.simplefilter('ignore') # prevent deluge of UserWarning from torch's rrn.py
|
||
|
||
# in the OnlineMultimodalGenerativeCVAE (see trajectron.model.online_mgcvae.py) each node's distribution
|
||
# is put stored in self.latent.p_dist by OnlineMultimodalGenerativeCVAE.p_z_x(). Type: torch.distributions.OneHotCategorical
|
||
# Later sampling in discrete_latent.py: DiscreteLatent.sample_p()
|
||
dists, preds = trajectron.incremental_forward(input_dict,
|
||
maps,
|
||
prediction_horizon=self.config.prediction_horizon, # TODO: make variable
|
||
num_samples=self.config.num_samples, # TODO: make variable
|
||
full_dist=self.config.full_dist, # "The model’s full sampled output, where z and y are sampled sequentially"
|
||
gmm_mode=self.config.gmm_mode, # "If True: The mode of the Gaussian Mixture Model (GMM) is sampled (see trajectron.model.mgcvae.py)"
|
||
z_mode=self.config.z_mode # "Predictions from the model’s most-likely high-level latent behavior mode" (see trajecton.models.components.discrete_latent:sample_p(most_likely_z=z_mode))
|
||
)
|
||
end = time.time()
|
||
logger.debug("took %.2f s (= %.2f Hz) w/ %d nodes and %d edges" % (end - start,
|
||
1. / (end - start), len(trajectron.nodes),
|
||
trajectron.scene_graph.get_num_edges()))
|
||
|
||
# unsure what this bit from online_prediction.py does:
|
||
# detailed_preds_dict = dict()
|
||
# for node in eval_scene.nodes:
|
||
# if node in preds:
|
||
# detailed_preds_dict[node] = preds[node]
|
||
|
||
#adapted from trajectron.visualization
|
||
# prediction_dict provides the actual predictions
|
||
# histories_dict provides the trajectory used for prediction
|
||
# futures_dict is the Ground Truth, which is unvailable in an online setting
|
||
|
||
prediction_dict, histories_dict, futures_dict = prediction_output_to_trajectories({timestep: preds},
|
||
eval_scene.dt,
|
||
hyperparams['maximum_history_length'],
|
||
hyperparams['prediction_horizon']
|
||
)
|
||
|
||
assert(len(prediction_dict.keys()) <= 1)
|
||
if len(prediction_dict.keys()) == 0:
|
||
return
|
||
ts_key = list(prediction_dict.keys())[0]
|
||
|
||
prediction_dict = prediction_dict[ts_key]
|
||
histories_dict = histories_dict[ts_key]
|
||
futures_dict = futures_dict[ts_key]
|
||
|
||
response = {}
|
||
logger.debug(f"{histories_dict=}")
|
||
|
||
for node in histories_dict:
|
||
history = histories_dict[node]
|
||
# future = futures_dict[node] # ground truth dict
|
||
predictions = prediction_dict[node]
|
||
|
||
if not len(history) or np.isnan(history[-1]).any():
|
||
continue
|
||
|
||
# response[node.id] = {
|
||
# 'id': node.id,
|
||
# 'det_conf': trajectory_data[node.id]['det_conf'],
|
||
# 'bbox': trajectory_data[node.id]['bbox'],
|
||
# 'history': history.tolist(),
|
||
# 'predictions': predictions[0].tolist() # use batch 0
|
||
# }
|
||
|
||
frame.tracks[node.id].predictor_history = history.tolist()
|
||
frame.tracks[node.id].predictions = predictions[0].tolist() # use batch 0
|
||
|
||
# data = json.dumps(response)
|
||
if self.config.predict_training_data:
|
||
logger.info(f"Frame prediction: {len(trajectron.nodes)} nodes & {trajectron.scene_graph.get_num_edges()} edges. Trajectron: {end - start}s")
|
||
else:
|
||
logger.info(f"Total frame delay = {time.time()-frame.time}s ({len(trajectron.nodes)} nodes & {trajectron.scene_graph.get_num_edges()} edges. Trajectron: {end - start}s)")
|
||
|
||
if self.config.smooth_predictions:
|
||
frame = self.smoother.smooth_frame_predictions(frame)
|
||
|
||
self.prediction_socket.send_pyobj(frame)
|
||
logger.info('Stopping')
|
||
|
||
|
||
|
||
def run_prediction_server(config: Namespace, is_running: Event):
|
||
|
||
# attempt to trace the warnings coming from pytorch
|
||
# def warn_with_traceback(message, category, filename, lineno, file=None, line=None):
|
||
|
||
# log = file if hasattr(file,'write') else sys.stderr
|
||
# traceback.print_stack(file=log)
|
||
# log.write(warnings.formatwarning(message, category, filename, lineno, line))
|
||
|
||
# warnings.showwarning = warn_with_traceback
|
||
s = PredictionServer(config, is_running)
|
||
s.run() |