trap/trap/lidar_tracker.py
2025-10-28 16:03:57 +01:00

823 lines
No EOL
31 KiB
Python

from argparse import ArgumentParser
from collections import defaultdict, deque, namedtuple
from copy import copy
from pathlib import Path
import select
import socket
from typing import DefaultDict, List
import numpy as np
import open3d as o3d
from sklearn.cluster import DBSCAN
from scipy.spatial.distance import cdist
import logging
import time
from trap.base import Camera, Detection, Frame, Track, UndistortedCamera
from trap.node import Node
from bytetracker import BYTETracker
import velodyne_decoder as vd
logger = logging.getLogger('lidar')
# import sort
# or ByteTrack
# This is what is used by velodyne_decoder internally on read_pcap:
ResultTuple = namedtuple("StampCloudTuple", ("stamp", "points"))
# In bag reader this is: Result = namedtuple("ResultTuple", ("stamp", "points", "topic", "frame_id"))
# like velodyne_decoder.read_pcap but with live streamed data
# see https://github.com/valgur/velodyne_decoder/issues/4
def read_live_data(ip, port, config, as_pcl_structs=False):
decoder = vd.StreamDecoder(config)
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# To increase the buffer at network device, you might want to run sudo sysctl -w net.core.rmem_max=4194304 && sudo sysctl -w net.core.rmem_default=4194304
sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 4 * 1024 * 1024) # 4MB
sock.bind((ip, port))
sock.setblocking(False)
inputs = [sock]
while True:
readable, _, _ = select.select([sock], [], [], 1.0)
latest_data = {}
if sock in readable:
# Drain all packets, keep only the last one, this prevents buffer build-up
# in situations where the processing is not keeping up with the LiDAR speed
while True:
try:
data, addr = sock.recvfrom(vd.PACKET_SIZE * 2)
recv_stamp = time.time()
result = decoder.decode(recv_stamp, data, as_pcl_structs)
if result is not None:
latest_data[addr] = ResultTuple(*result)
except BlockingIOError:
break # No more packets in the queue
for addr, data in latest_data.items():
yield data
else:
logger.warning("No LiDAR data in the last second.")
def filter_raw_points_by_distance(points: np.ndarray, center: np.ndarray, radius):
"""
Filters an Open3D point cloud based on distance from a center point.
Args:
pointcloud: An Open3D PointCloud object.
center: A NumPy array representing the center point (x, y, z).
radius: The maximum distance from the center point to keep points.
Returns:
A new Open3D PointCloud object containing only the points within the specified radius.
"""
distances = np.linalg.norm(points - center, axis=1) # Calculate distance from each point to the center
indices = np.where(distances <= radius)[0] # Find indices of points within the radius
filtered_points = points[indices]
return filtered_points
class LidarTracker(Node):
def setup(self):
if self.config.smooth_tracks:
self.logger.warning("Smoothing not yet implemented")
self.track_sock = self.pub(self.config.zmq_trajectory_addr)
calibration = vd.Calibration.read( "VLP-16.yaml")
config = vd.Config(model=vd.Model.VLP16, calibration=calibration)
if self.config.pcap and self.config.pcap.exists():
# self.sequence_generator = vd.read_pcap(self.config.pcap, config, time_range=[1761139039+100,None])
self.sequence_generator = vd.read_pcap(self.config.pcap, config, time_range=[1761139039+30,None])
# self.sequence_generator = vd.read_pcap(self.config.pcap, config)
else:
self.sequence_generator = read_live_data(self.config.ip, self.config.port, config)
self.tracker = BYTETracker(frame_rate=10)
self.tracks: DefaultDict[str, Track] = defaultdict(lambda: Track())
self.room_filter = VoxelOccupancyFilter(
voxel_size=.5,
history_size=80,
history_interval=8, #not every frame needs to impact history
ready_for_action_at=25,
static_threshold=0.3
)
def run(self):
if self.config.viz:
vis = o3d.visualization.Visualizer()
vis.create_window()
# Set the point size
render_option = vis.get_render_option()
render_option.point_size = 1.0 # Smaller point size
render_option.background_color = np.array([0,0,0]) # Smaller point size
# render_option.camera
view_ctl = vis.get_view_control()
# front_direction = np.array([0,0,-1])
# front_direction = front_direction / np.linalg.norm(front_direction) # Normalize!
# view_ctl.set_front(front_direction.tolist()) # Convert to list
# view_ctl.set_lookat([0, 0, 0])
# view_ctl.set_up([0, 0, 1])
# view_ctl.set_zoom(1)
# view_ctl.set_front([0, 0, -1])
# view_ctl.set_up([1, 0, 0])
# view_ctl.set_zoom(1.5)
geom_added = False
live_pcd = o3d.geometry.PointCloud()
live_pcd.paint_uniform_color([0.2, 0.2, 0.7])
live_filtered_pcd = o3d.geometry.PointCloud()
live_filtered_pcd.paint_uniform_color([0.0, 0.7, 0.2])
coordinate_indicator = o3d.geometry.TriangleMesh.create_coordinate_frame()
vis.add_geometry(coordinate_indicator, False)
ground_lines = o3d.geometry.LineSet(
o3d.utility.Vector3dVector([[0,0,0], [4,4,0], [4,-4,0], [-4,-4,0], [-4,4,0]]),
o3d.utility.Vector2iVector([[0,1], [0,2], [0,3], [0,4], [1,2], [2,3], [3,4], [4,1]])
)
ground_lines.paint_uniform_color([0,1,0])
vis.add_geometry(ground_lines, False)
ground_normal = np.array([1,1,0])
found_planes = False
transform_matrix = np.array([[0.8457653099655785, -0.4625540400071917, -0.26594134791689383, -6.176546342430921], [-0.4412543343066861, -0.886591089552013, 0.13874744099455633, 4.588060710249307], [-0.29995941877926136, -4.340500985936626e-17, -0.9539519626719197, 0.6514765464071752], [0.0, 0.0, 0.0, 1.0]])
# transform_matrix = np.array([[0.8393535624356702, -0.4568202093538991, -0.2946199137064719, 7.480984770326982], [-0.46710069069557, -0.8833469220220505, 0.03892505755796417, 1.2182001838057144], [-0.2780333468817559, 0.1049452794556715, -0.9548214211786936, -1.598860001424085], [0.0, 0.0, 0.0, 1.0]])
camera = UndistortedCamera()
frame_idx = 0
while self.run_loop():
frame_idx += 1
stamp, lidar_points = next(self.sequence_generator)
# print(len(lidar_points))
# print(stamp)
raw_frame_points = velodyne_to_points(lidar_points)
raw_frame_points = filter_raw_points_by_distance(raw_frame_points, np.array([0,0,0]), 15)
current_pcd = o3d.geometry.PointCloud()
current_pcd.points = points_to_o3d(raw_frame_points)
R = current_pcd.get_rotation_matrix_from_xyz((0,0, np.pi / 4+.1))
current_pcd.rotate(R)
current_pcd.translate( [9.7, -11.14, 0])
current_pcd = flip_points_along_axis(current_pcd, 1)
# current_pcd.scale(np.array([1,-1, 1]), )
# current_pcd.transform(transform_matrix)
# if not found_planes:
# self.planes = segment_planes(current_pcd, 4, distance_threshold=.2)
# found_planes = True
# self.logger.info(f"Found {len(self.planes)} planes in points cloud")
# if self.config.viz:
# for i, plane in enumerate(self.planes):
# color = [0,0,0]
# color[i%3] = 1
# # color = [1*i/len(self.planes),0,0]
# plane.paint_uniform_color(color)
# print(f"Add plane {len(plane.points)} points")
# vis.add_geometry(plane, True)
# a = time.perf_counter()
filtered_pcd = self.room_filter.remove_static(current_pcd)
stat_static = len(filtered_pcd.points)
if self.config.viz:
live_pcd.points = current_pcd.points
if self.room_filter.initialised:
# filtered_pcd, _ = filtered_pcd.remove_statistical_outlier(nb_neighbors=20, std_ratio=2.0)
filtered_pcd, _ = filtered_pcd.remove_radius_outlier(nb_points=5, radius=0.2)
stat_denoise = len(filtered_pcd.points)
# down sample
filtered_pcd = filtered_pcd.voxel_down_sample(voxel_size=0.05)
if self.config.viz:
live_filtered_pcd.points = filtered_pcd.points
# live_pcd.colors = o3d.utility.Vector3dVector(np.asarray(rgb_image)[mask].reshape(-1, 3) / 255.0)
stat_downsampled = len(filtered_pcd.points)
# b = time.perf_counter()
# filtered_pcd2 = room_filter2.remove_static(current_pcd)
# c = time.perf_counter()
# print(f"after static object removal: {stat_static} / after outlier removal: {stat_denoise} / after_downsample: {stat_downsampled}")
# pcd.points = filtered_pcd.points
points_2d = project_to_xy(filtered_pcd)
# points_2d = project_to_plane(filtered_pcd, ground_normal)
# non_ground = remove_ground(pcd)
# points_2d = project_to_plane(pro)
# points_2d = non_ground[:, :2]
labels = cluster_2d(points_2d)
# boxes, centroids = get_cluster_boxes(points_2d, labels, min_area= 0.3*0.3)
boxes, centroids = get_cluster_boxes(points_2d, labels, min_area= self.config.min_box_area)
# append confidence and class (placeholders)
detections = np.ones((boxes.shape[0],boxes.shape[1]+2));
detections[:,:-2] = boxes
# detections = np.array([
# # coords in xyxy
# box.tolist() + [1,1] for box in boxes
# ])
online_tracks = self.tracker.update(detections)
self.logger.debug(f"online tracks: {[t[4] for t in online_tracks]}")
removed_tracks = self.tracker.removed_stracks
# active_stracks = [track for track in self.tracker.tracked_stracks if track.is_activated]
active_stracks = [track for track in self.tracker.tracked_stracks if track.is_activated]
detections = [Detection.from_bytetrack(track, frame_idx) for track in active_stracks]
for detection in detections:
track = self.tracks[detection.track_id]
track.track_id = detection.track_id # for new tracks
track.fps = 10
track.frame_index = frame_idx
track.updated_at = time.time()
# track.fps = self.config.camera.fps # for new tracks
track.history.append(detection) # add to history
for t in removed_tracks:
if t.track_id in self.tracks:
del self.tracks[t.track_id]
# TODO: fix this oddity:
# else:
# logger.debug(f"removing non-existing track? id: {t.track_id}")
# reset for each iteration
# self.tracker.removed_stracks = []
active_track_ids = [d.track_id for d in detections]
active_tracks = {t.track_id: t.get_with_interpolated_history() for t in self.tracks.values() if t.track_id in active_track_ids}
# active_tracks = displacement_filter.apply_to_dict(active_tracks, frame.camera)# a filter to remove just detecting static objects
# frame = Frame(frame_idx, None, time.time(), self.tracks, camera.H, camera)
frame = Frame(frame_idx, None, time.time(), active_tracks,
camera.H, camera)
if self.config.smooth_tracks:
frame = self.smoother.smooth_frame_tracks(frame)
self.track_sock.send_pyobj(frame)
if len(centroids):
self.logger.debug(f"{centroids=}")
if self.config.viz:
line_sets = []
for box in boxes:
line_sets.append(box_to_lineset(box, 0, [.3,.3,.3]))
for track in active_stracks:
color = RGB_COLORS[track.track_id % len(RGB_COLORS)]
line_sets.append(box_to_lineset(track.tlbr, 0, color))
for line_set in line_sets:
vis.add_geometry(line_set, False)
elif self.config.viz:
line_sets = []
live_filtered_pcd.points = o3d.utility.Vector3dVector(np.empty([0,3]))
# tracks = tracker.update(centroids)
# Build tracked spheres
spheres = []
# for _, pos in tracks.items():
# sphere = o3d.geometry.TriangleMesh.create_sphere(radius=0.15)
# sphere.translate([pos[0], pos[1], 0])
# sphere.paint_uniform_color([1, 0, 0])
# spheres.append(sphere)
if self.config.viz:
color = [0.5, 0.5, 0.7] if self.room_filter.initialised else [0.7, 0.7, 0.7]
live_pcd.paint_uniform_color(color)
live_filtered_pcd.paint_uniform_color([0.2, 0.7, 0.2])
if not geom_added:
vis.add_geometry(live_filtered_pcd, True)
vis.add_geometry(live_pcd, True)
for s in spheres:
vis.add_geometry(s, False)
geom_added = True
else:
# vis.remove_geometry(live_pcd, False)
vis.update_geometry(live_pcd)
vis.update_geometry(live_filtered_pcd)
for s in spheres:
vis.add_geometry(s, False)
vis.poll_events()
vis.update_renderer()
# time.sleep(0.1)
for s in line_sets:
vis.remove_geometry(s, False)
for s in spheres:
vis.remove_geometry(s, False)
if self.config.viz:
vis.destroy_window()
@classmethod
def arg_parser(cls):
argparser = ArgumentParser()
argparser.add_argument('--zmq-trajectory-addr',
help='Manually specity communication addr for the trajectory messages',
type=str,
default="ipc:///tmp/feeds_traj")
argparser.add_argument('--ip',
help='IP of this computer on which to listen for IP packets broadcast by lidar (so NOT the ip of the Lidar itself)',
type=str,
default="192.168.0.70")
argparser.add_argument('--port',
help='Port of the incomming ip packets',
type=int,
default=2368)
argparser.add_argument('--pcap',
help='Read pcap instead of socket',
type=Path,
default=None)
# argparser.add_argument('--pcap-offset',
# help='offset in seconds',
# type=int,
# default=None)
argparser.add_argument('--min-box-area',
help='Minimum area of bounding boxes to consider them for tracking',
type=float,
default=.3*.2)
argparser.add_argument('--max-box-area',
help='Maximum area (m2) of bounding boxes to consider them for tracking',
type=float,
default=2)
argparser.add_argument("--smooth-tracks",
help="Smooth the tracker tracks before sending them to the predictor",
action='store_true')
argparser.add_argument("--viz",
help="Render pointclouds in open3d",
action='store_true')
return argparser
# ==== Ground Removal ====
def find_ground(pcd: o3d.geometry.PointCloud, distance_threshold=0.1):
_, inliers = pcd.segment_plane(distance_threshold=distance_threshold,
ransac_n=3,
num_iterations=100)
ground = pcd.select_by_index(inliers)
non_ground = pcd.select_by_index(inliers, invert=True)
return np.asarray(non_ground.points), np.asarray(ground)
def segment_planes(pcd, num_planes = 5, num_iterations=100, distance_threshold=0.1) -> List[o3d.geometry.PointCloud]:
"""
Segments a point cloud into multiple planes using RANSAC.
Args:
pcd: The input point cloud.
num_iterations: The number of RANSAC iterations.
distance_threshold: The maximum distance for a point to be considered an inlier.
Returns:
A list of point clouds, each representing a segmented plane.
"""
planes = []
remaining_pcd = copy(pcd)
for i in range(num_planes):
if len(remaining_pcd.points) <= 100: # Stop when very few points remain
break
# Perform RANSAC plane segmentation
plane_def, inliers = remaining_pcd.segment_plane(distance_threshold=distance_threshold,
ransac_n=3,
num_iterations=num_iterations)
# Extract the plane
plane = remaining_pcd.select_by_index(inliers)
# Add the plane to the list
planes.append(plane)
# Remove the plane from the remaining point cloud
remaining_pcd = remaining_pcd.select_by_index(inliers, invert=True)
return planes
# ==== Clustering and Centroids ====
def cluster_2d(points_xy, eps=0.5, min_samples=5):
if not len(points_xy):
return []
db = DBSCAN(eps=eps, min_samples=min_samples).fit(points_xy)
return db.labels_
def compute_centroids(points_xy, labels):
centroids = []
for label in set(labels):
if label == -1:
continue
cluster = points_xy[labels == label]
centroid = cluster.mean(axis=0)
centroids.append(centroid)
return np.array(centroids)
def get_cluster_boxes(points_xy, labels, min_area = 0):
if not len(labels):
return np.empty([0,4]), np.empty([0,2])
boxes = [] # Each box: [x1, y1, x2, y2]
centroids = []
unique_labels = set(labels)
for label in unique_labels:
if label == -1:
continue # Skip noise
cluster = points_xy[labels == label]
x_min, y_min = cluster.min(axis=0)
x_max, y_max = cluster.max(axis=0)
area = (x_max-x_min) * (y_max - y_min)
if area < min_area:
logger.warning(f"Dropping box {area} ")
continue
centroid = cluster.mean(axis=0)
boxes.append([x_min, y_min, x_max, y_max])
centroids.append(centroid)
if not len(boxes):
return np.empty([0,4]), np.empty([0,2])
return np.array(boxes), np.array(centroids)
def box_to_lineset(box, z_height=0.0, color=[1.0, 0.0, 0.0]) -> List[o3d.geometry.LineSet]:
"""Draws a list of 2D bounding boxes onto an Open3D visualization.
Args:
boxes (list): A list of bounding boxes, where each box is a list
of [x_min, y_min, x_max, y_max].
z_height (float): The z-coordinate height of the boxes.
color (list): The color of the boxes (RGB).
"""
x_min, y_min, x_max, y_max = box
# Define the 8 corners of the box in 3D
corners = np.array([
[x_min, y_min, z_height],
[x_max, y_min, z_height],
[x_max, y_max, z_height],
[x_min, y_max, z_height],
[x_min, y_min, z_height + 1.0], # Top corners
[x_max, y_min, z_height + 1.0],
[x_max, y_max, z_height + 1.0],
[x_min, y_max, z_height + 1.0],
])
# Create lines from the corners to form the edges of the box
line_idxs = []
for i in range(8):
for j in range(i + 1, 8):
line_idxs.append([i, j])
line_idxs = np.array(line_idxs)
points = o3d.utility.Vector3dVector(corners)
lines = o3d.utility.Vector2iVector(line_idxs)
# Create a LineSet from the lines
line_set = o3d.geometry.LineSet(points, lines)
line_set.colors = o3d.utility.Vector3dVector(np.tile(color, (len(line_idxs), 1))) # Assign color
return line_set
def velodyne_to_points(points) -> o3d.geometry.PointCloud:
"""lidar points have additional metadata. Strip that for clustering
"""
if points is None:
return []
try:
points_np = np.array(points, dtype=np.float64)
if points_np.shape[1] != 8:
raise RuntimeError(f"Warning: Point cloud has unexpected shape {points_np.shape}. Expected (N, 8). Skipping.")
# Extract x, y, and z coordinates
x = points_np[:, 0]
y = points_np[:, 1]
z = points_np[:, 2]
# Stack the coordinates to create a (N, 3) array
points_xyz = np.stack([x, y, z], axis=1)
except ValueError as e:
raise RuntimeError(f"Error converting points to NumPy array: {e}. Skipping.")
return points_xyz
def points_to_o3d(points_xyz: np.array) -> o3d.geometry.PointCloud:
return o3d.utility.Vector3dVector(points_xyz)
class VoxelOccupancyFilter:
def __init__(self, voxel_size=0.2, history_size=150, static_threshold=0.2, history_interval=1, ready_for_action_at=None):
"""
Initializes the voxel occupancy filter.
Parameters:
voxel_size (float): Size of each voxel cube in meters.
history_size (int): Number of past frames to keep.
static_threshold (float): Fraction of frames a voxel must appear in to be considered static.
"""
self.voxel_size = voxel_size
self.history_size = history_size
self.static_threshold = static_threshold
self.voxel_history = deque(maxlen=history_size)
self.static_voxels = {}
self.ready_for_action_at = ready_for_action_at if ready_for_action_at else history_size
self.initialised = False
self.history_interval = history_interval
self.i = -1
def _point_cloud_to_voxel_set(self, pcd):
"""
Converts a point cloud to a set of voxel coordinate tuples.
"""
points = np.asarray(pcd.points)
voxel_coords = np.floor(points / self.voxel_size).astype(np.int32)
return set(map(tuple, voxel_coords))
def remove_static(self, current_pcd):
"""
Removes static points from the current point cloud.
Parameters:
current_pcd (open3d.geometry.PointCloud): Current point cloud frame.
Returns:
open3d.geometry.PointCloud: Filtered point cloud with static points removed.
"""
self.i = (self.i + 1) % self.history_interval
# Filter out static points from current point cloud
filtered_points = []
for pt in np.asarray(current_pcd.points):
voxel = tuple(np.floor(pt / self.voxel_size).astype(np.int32))
if voxel not in self.static_voxels:
filtered_points.append(pt)
# Update history: until full, from there every n frame
# this way, minimal history is necessary to still to encode participants, while
# handling more semi-permanent changes to the space
if self.i == 0 or len(self.voxel_history) < self.history_size:
current_voxel_set = self._point_cloud_to_voxel_set(current_pcd)
self.voxel_history.append(current_voxel_set)
voxel_counts = {}
# Count voxel occurrences in history
for voxel_set in self.voxel_history:
for v in voxel_set:
voxel_counts[v] = voxel_counts.get(v, 0) + 1
# Determine static voxels
self.static_voxels = {v for v, count in voxel_counts.items()
if count / len(self.voxel_history) >= self.static_threshold}
if not self.initialised and len(self.voxel_history) >= self.ready_for_action_at:
logger.info(f"Static object filter ready: {len(self.voxel_history)} historical items, keep max {self.history_size}")
self.initialised = True
# Return filtered point cloud
filtered_pcd = o3d.geometry.PointCloud()
if filtered_points:
filtered_pcd.points = o3d.utility.Vector3dVector(np.array(filtered_points))
return filtered_pcd
class DecayingVoxelOccupancyFilter:
def __init__(self, voxel_size=0.1, decay_rate=0.01, appear_increment=0.01, static_threshold=0.2):
"""
WIP!! Initializes the voxel occupancy filter with decay.
Parameters:
voxel_size (float): Size of each voxel in meters.
decay_rate (float): Amount to decay occupancy per frame.
appear_increment (float): Value added when a voxel appears in current frame.
static_threshold (float): Threshold to consider voxel as static.
"""
self.voxel_size = voxel_size
self.decay_rate = decay_rate
self.appear_increment = appear_increment
self.static_threshold = static_threshold
self.voxel_occupancy = {} # dict: voxel tuple -> occupancy score
def _point_cloud_to_voxel_set(self, pcd):
points = np.asarray(pcd.points)
voxel_coords = np.floor(points / self.voxel_size).astype(np.int32)
return set(map(tuple, voxel_coords))
def remove_static(self, current_pcd):
current_voxels = self._point_cloud_to_voxel_set(current_pcd)
# Decay all existing voxel scores
for voxel in list(self.voxel_occupancy.keys()):
self.voxel_occupancy[voxel] *= (1.0 - self.decay_rate)
if self.voxel_occupancy[voxel] < self.appear_increment:
del self.voxel_occupancy[voxel] # clean up low-score entries
# Update occupancy with current frame
for voxel in current_voxels:
self.voxel_occupancy[voxel] = self.voxel_occupancy.get(voxel, 0.0) + self.appear_increment
self.voxel_occupancy[voxel] = min(self.voxel_occupancy[voxel], 1.0) # clamp to max=1.0
# Identify static voxels
static_voxels = {v for v, score in self.voxel_occupancy.items() if score >= self.static_threshold}
# Filter points
filtered_points = []
for pt in np.asarray(current_pcd.points):
voxel = tuple(np.floor(pt / self.voxel_size).astype(np.int32))
if voxel not in static_voxels:
filtered_points.append(pt)
filtered_pcd = o3d.geometry.PointCloud()
if filtered_points:
filtered_pcd.points = o3d.utility.Vector3dVector(np.array(filtered_points))
return filtered_pcd
# def point_cloud_to_voxel_set(pcd, voxel_size):
# points = np.asarray(pcd.points)
# # Convert to voxel coordinates
# voxel_coords = np.floor(points / voxel_size).astype(np.int32)
# # Use set of tuple coordinates
# return set(map(tuple, voxel_coords))
# def remove_static_points(current_pcd, voxel_history, voxel_size, threshold=STATIC_THRESHOLD):
# current_voxels = point_cloud_to_voxel_set(current_pcd, voxel_size)
# # Count how many times each voxel appears in history
# voxel_counts = {}
# for vh in voxel_history:
# for v in vh:
# voxel_counts[v] = voxel_counts.get(v, 0) + 1
# # Mark voxels that are static
# static_voxels = {v for v, count in voxel_counts.items() if count / HISTORY_SIZE >= threshold}
# # print(len(static_voxels))
# # Filter out static points from current frame
# filtered_points = []
# for pt in np.asarray(current_pcd.points):
# voxel = tuple(np.floor(pt / voxel_size).astype(np.int32))
# if voxel not in static_voxels:
# filtered_points.append(pt)
# filtered_pcd = o3d.geometry.PointCloud()
# filtered_pcd.points = o3d.utility.Vector3dVector(filtered_points)
# return filtered_pcd
def project_to_xy(pcd: o3d.geometry.PointCloud) -> np.ndarray:
"""
Projects a 3D point cloud to the XY plane.
Returns:
2D numpy array of shape (N, 2) with [x, y] coordinates.
"""
points = np.asarray(pcd.points)
return points[:, :2] # Drop Z
def project_to_plane(point_cloud, plane_normal: np.ndarray):
"""Projects a point cloud onto a plane.
Args:
point_cloud (o3d.geometry.PointCloud): The input point cloud.
plane_normal (numpy.ndarray): The normal vector of the plane.
Returns:
o3d.geometry.PointCloud: The projected point cloud.
"""
# 1. Calculate the rotation matrix to align the plane normal with the z-axis
plane_normal = plane_normal / np.linalg.norm(plane_normal) # Normalize
# Calculate a rotation matrix that rotates the plane normal to the z-axis.
# We use the Rodrigues' rotation formula. This requires finding an axis
# perpendicular to both the normal and the z-axis.
z_axis = np.array([0, 0, 1])
rotation_axis = np.cross(plane_normal, z_axis)
rotation_axis = rotation_axis / np.linalg.norm(rotation_axis)
# angle = np.arccos(np.dot(plane_normal, z_axis))
# print(rotation_axis, angle)
rotation_matrix = o3d.geometry.get_rotation_matrix_from_axis_angle(rotation_axis.astype(np.float64))
# 2. Transform the point cloud
transformed_pc = point_cloud.rotate(rotation_matrix, center=(0,0,0))
# 3. Translation to make the plane at z=0
translation_vector = -np.asarray(transformed_pc.get_center())
transformed_pc = transformed_pc.translate(translation_vector)
# 4. Drop Z-coordinate
points = np.asarray(transformed_pc.points)
projected_points = points[:, :2] # Take only x and y coordinates
# projected_pc = o3d.geometry.PointCloud()
# projected_pc.points = o3d.utility.Vector3dVector(projected_points)
return projected_points
RGB_COLORS = [
(255, 0, 0),
(0, 255, 0),
# (0, 0, 255),# blue used for missing map
(0, 255, 255),
(255, 255, 0),
]
def flip_points_along_axis(pcd, axis):
"""
Flips the points of an Open3D PointCloud along a specified axis.
Args:
pcd (open3d.geometry.PointCloud): The input PointCloud.
axis (int): The axis to flip along (0 for X, 1 for Y, 2 for Z).
Returns:
open3d.geometry.PointCloud: The PointCloud with flipped points.
"""
points = np.asarray(pcd.points)
flipped_points = points.copy()
if axis == 0: # Flip along X-axis
flipped_points[:, 0] = -flipped_points[:, 0]
elif axis == 1: # Flip along Y-axis
flipped_points[:, 1] = -flipped_points[:, 1]
elif axis == 2: # Flip along Z-axis
flipped_points[:, 2] = -flipped_points[:, 2]
else:
raise ValueError("Invalid axis. Must be 0, 1, or 2.")
flipped_pcd = o3d.geometry.PointCloud()
flipped_pcd.points = o3d.utility.Vector3dVector(flipped_points)
return flipped_pcd