Different smoothing and filtering before parsing data
This commit is contained in:
		
							parent
							
								
									0f96611771
								
							
						
					
					
						commit
						d6eac14898
					
				
					 7 changed files with 238 additions and 71 deletions
				
			
		| 
						 | 
					@ -77,16 +77,17 @@ class CameraAction(argparse.Action):
 | 
				
			||||||
        if values is None:
 | 
					        if values is None:
 | 
				
			||||||
            setattr(namespace, self.dest, None)
 | 
					            setattr(namespace, self.dest, None)
 | 
				
			||||||
        else:
 | 
					        else:
 | 
				
			||||||
            values = Path(values)
 | 
					            camera = Camera.from_calibfile(Path(values), namespace.H, namespace.camera_fps)
 | 
				
			||||||
            with values.open('r') as fp:
 | 
					            # values = Path(values)
 | 
				
			||||||
                data = json.load(fp)
 | 
					            # with values.open('r') as fp:
 | 
				
			||||||
                # print(data)
 | 
					            #     data = json.load(fp)
 | 
				
			||||||
                # print(data['camera_matrix'])
 | 
					            #     # print(data)
 | 
				
			||||||
            # camera = {
 | 
					            #     # print(data['camera_matrix'])
 | 
				
			||||||
            #     'camera_matrix': np.array(data['camera_matrix']), 
 | 
					            # # camera = {
 | 
				
			||||||
            #     'dist_coeff': np.array(data['dist_coeff']),
 | 
					            # #     'camera_matrix': np.array(data['camera_matrix']), 
 | 
				
			||||||
            # }
 | 
					            # #     'dist_coeff': np.array(data['dist_coeff']),
 | 
				
			||||||
            camera = Camera(np.array(data['camera_matrix']), np.array(data['dist_coeff']), data['dim']['width'], data['dim']['height'], namespace.H, namespace.camera_fps)
 | 
					            # # }
 | 
				
			||||||
 | 
					            # camera = Camera(np.array(data['camera_matrix']), np.array(data['dist_coeff']), data['dim']['width'], data['dim']['height'], namespace.H, namespace.camera_fps)
 | 
				
			||||||
            
 | 
					            
 | 
				
			||||||
            setattr(namespace, 'camera', camera)
 | 
					            setattr(namespace, 'camera', camera)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -338,6 +338,9 @@ class CvRenderer:
 | 
				
			||||||
        i=0
 | 
					        i=0
 | 
				
			||||||
        first_time = None
 | 
					        first_time = None
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        cv2.namedWindow("frame", cv2.WND_PROP_FULLSCREEN)
 | 
				
			||||||
 | 
					        cv2.setWindowProperty("frame",cv2.WND_PROP_FULLSCREEN,cv2.WINDOW_FULLSCREEN)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        while self.is_running.is_set():
 | 
					        while self.is_running.is_set():
 | 
				
			||||||
            i+=1
 | 
					            i+=1
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -385,7 +388,7 @@ class CvRenderer:
 | 
				
			||||||
            if self.streaming_process:
 | 
					            if self.streaming_process:
 | 
				
			||||||
                self.streaming_process.stdin.write(img.tobytes())
 | 
					                self.streaming_process.stdin.write(img.tobytes())
 | 
				
			||||||
            if self.config.render_window:
 | 
					            if self.config.render_window:
 | 
				
			||||||
                cv2.imshow('frame',img)
 | 
					                cv2.imshow('frame',cv2.resize(img, (1920, 1080)))
 | 
				
			||||||
                cv2.waitKey(1)
 | 
					                cv2.waitKey(1)
 | 
				
			||||||
        logger.info('Stopping')
 | 
					        logger.info('Stopping')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -57,10 +57,17 @@ class UrlOrPath():
 | 
				
			||||||
            return Path(self.url.path)
 | 
					            return Path(self.url.path)
 | 
				
			||||||
        return Path(self.url.geturl()) # can include scheme, such as C:/
 | 
					        return Path(self.url.geturl()) # can include scheme, such as C:/
 | 
				
			||||||
    
 | 
					    
 | 
				
			||||||
 | 
					class Space(IntFlag):
 | 
				
			||||||
 | 
					    Image = 1 # As detected in the image
 | 
				
			||||||
 | 
					    Undistorted = 2 # After applying lense undistortiion
 | 
				
			||||||
 | 
					    World = 4 # After lens undistort and homography
 | 
				
			||||||
 | 
					    Render = 8 # View space of renderer
 | 
				
			||||||
 | 
					    
 | 
				
			||||||
class DetectionState(IntFlag):
 | 
					class DetectionState(IntFlag):
 | 
				
			||||||
    Tentative = 1 # state before n_init (see DeepsortTrack)
 | 
					    Tentative = 1 # state before n_init (see DeepsortTrack)
 | 
				
			||||||
    Confirmed = 2 # after tentative
 | 
					    Confirmed = 2 # after tentative
 | 
				
			||||||
    Lost = 4 # lost when DeepsortTrack.time_since_update > 0 but not Deleted
 | 
					    Lost = 4 # lost when DeepsortTrack.time_since_update > 0 but not Deleted
 | 
				
			||||||
 | 
					    Interpolated = 8 # A position estimated through interpolation of adjecent detections
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    @classmethod
 | 
					    @classmethod
 | 
				
			||||||
    def from_deepsort_track(cls, track: DeepsortTrack):
 | 
					    def from_deepsort_track(cls, track: DeepsortTrack):
 | 
				
			||||||
| 
						 | 
					@ -83,6 +90,14 @@ class DetectionState(IntFlag):
 | 
				
			||||||
            return cls.Confirmed
 | 
					            return cls.Confirmed
 | 
				
			||||||
        raise RuntimeError("Should not run into Deleted entries here")
 | 
					        raise RuntimeError("Should not run into Deleted entries here")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def H_from_path(path: Path):
 | 
				
			||||||
 | 
					    if path.suffix == '.json':
 | 
				
			||||||
 | 
					        with path.open('r') as fp:
 | 
				
			||||||
 | 
					            H = np.array(json.load(fp))
 | 
				
			||||||
 | 
					    else:
 | 
				
			||||||
 | 
					        H = np.loadtxt(path, delimiter=',')
 | 
				
			||||||
 | 
					    return H
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@dataclass
 | 
					@dataclass
 | 
				
			||||||
class Camera:
 | 
					class Camera:
 | 
				
			||||||
    mtx: cv2.Mat
 | 
					    mtx: cv2.Mat
 | 
				
			||||||
| 
						 | 
					@ -98,6 +113,27 @@ class Camera:
 | 
				
			||||||
    def __post_init__(self):
 | 
					    def __post_init__(self):
 | 
				
			||||||
        self.newcameramtx, self.roi = cv2.getOptimalNewCameraMatrix(self.mtx, self.dist, (self.w,self.h), 1, (self.w,self.h))
 | 
					        self.newcameramtx, self.roi = cv2.getOptimalNewCameraMatrix(self.mtx, self.dist, (self.w,self.h), 1, (self.w,self.h))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @classmethod
 | 
				
			||||||
 | 
					    def from_calibfile(cls, calibration_path, H, fps):
 | 
				
			||||||
 | 
					        with calibration_path.open('r') as fp:
 | 
				
			||||||
 | 
					            data = json.load(fp)
 | 
				
			||||||
 | 
					            # print(data)
 | 
				
			||||||
 | 
					            # print(data['camera_matrix'])
 | 
				
			||||||
 | 
					        # camera = {
 | 
				
			||||||
 | 
					        #     'camera_matrix': np.array(data['camera_matrix']), 
 | 
				
			||||||
 | 
					        #     'dist_coeff': np.array(data['dist_coeff']),
 | 
				
			||||||
 | 
					        # }
 | 
				
			||||||
 | 
					        return cls(
 | 
				
			||||||
 | 
					            np.array(data['camera_matrix']), 
 | 
				
			||||||
 | 
					            np.array(data['dist_coeff']), 
 | 
				
			||||||
 | 
					            data['dim']['width'], 
 | 
				
			||||||
 | 
					            data['dim']['height'], 
 | 
				
			||||||
 | 
					            H, fps)
 | 
				
			||||||
 | 
					    
 | 
				
			||||||
 | 
					    @classmethod
 | 
				
			||||||
 | 
					    def from_paths(cls, calibration_path, h_path, fps):
 | 
				
			||||||
 | 
					        H = H_from_path(h_path)
 | 
				
			||||||
 | 
					        return cls.from_calibfile(calibration_path, H, fps)
 | 
				
			||||||
    
 | 
					    
 | 
				
			||||||
    # def __init__(self, mtx, dist, w, h, H):
 | 
					    # def __init__(self, mtx, dist, w, h, H):
 | 
				
			||||||
    #     self.mtx = mtx
 | 
					    #     self.mtx = mtx
 | 
				
			||||||
| 
						 | 
					@ -107,6 +143,14 @@ class Camera:
 | 
				
			||||||
    #     self.newcameramtx, self.roi = cv2.getOptimalNewCameraMatrix(mtx, dist, (w,h), 1, (w,h))
 | 
					    #     self.newcameramtx, self.roi = cv2.getOptimalNewCameraMatrix(mtx, dist, (w,h), 1, (w,h))
 | 
				
			||||||
    #     self.H = H # homography
 | 
					    #     self.H = H # homography
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					@dataclass
 | 
				
			||||||
 | 
					class Position:
 | 
				
			||||||
 | 
					    x: float
 | 
				
			||||||
 | 
					    y: float
 | 
				
			||||||
 | 
					    conf: float
 | 
				
			||||||
 | 
					    state: DetectionState
 | 
				
			||||||
 | 
					    frame_nr: int
 | 
				
			||||||
 | 
					    det_class: str
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@dataclass
 | 
					@dataclass
 | 
				
			||||||
class Detection:
 | 
					class Detection:
 | 
				
			||||||
| 
						 | 
					@ -120,7 +164,7 @@ class Detection:
 | 
				
			||||||
    frame_nr: int
 | 
					    frame_nr: int
 | 
				
			||||||
    det_class: str
 | 
					    det_class: str
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def get_foot_coords(self) -> list[tuple[float, float]]:
 | 
					    def get_foot_coords(self) -> list[float, float]:
 | 
				
			||||||
        return [self.l + 0.5 * self.w, self.t+self.h]
 | 
					        return [self.l + 0.5 * self.w, self.t+self.h]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    @classmethod
 | 
					    @classmethod
 | 
				
			||||||
| 
						 | 
					@ -153,6 +197,16 @@ class Detection:
 | 
				
			||||||
    def to_ltrb(self):
 | 
					    def to_ltrb(self):
 | 
				
			||||||
        return (int(self.l), int(self.t), int(self.l+self.w), int(self.t+self.h))
 | 
					        return (int(self.l), int(self.t), int(self.l+self.w), int(self.t+self.h))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					@dataclass
 | 
				
			||||||
 | 
					class Trajectory:
 | 
				
			||||||
 | 
					    # TODO)) Replace history and predictions in Track with Trajectory
 | 
				
			||||||
 | 
					    space: Space
 | 
				
			||||||
 | 
					    fps: int = 12
 | 
				
			||||||
 | 
					    points: List[Detection] = field(default_factory=list)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def __iter__(self):
 | 
				
			||||||
 | 
					        for d in self.points:
 | 
				
			||||||
 | 
					            yield d
 | 
				
			||||||
    
 | 
					    
 | 
				
			||||||
    
 | 
					    
 | 
				
			||||||
@dataclass
 | 
					@dataclass
 | 
				
			||||||
| 
						 | 
					@ -162,7 +216,7 @@ class Track:
 | 
				
			||||||
    and acceleration.
 | 
					    and acceleration.
 | 
				
			||||||
    """
 | 
					    """
 | 
				
			||||||
    track_id: str = None
 | 
					    track_id: str = None
 | 
				
			||||||
    history: List[Detection] = field(default_factory=lambda: [])
 | 
					    history: List[Detection] = field(default_factory=list)
 | 
				
			||||||
    predictor_history: Optional[list] = None # in image space
 | 
					    predictor_history: Optional[list] = None # in image space
 | 
				
			||||||
    predictions: Optional[list] = None
 | 
					    predictions: Optional[list] = None
 | 
				
			||||||
    fps: int = 12
 | 
					    fps: int = 12
 | 
				
			||||||
| 
						 | 
					@ -236,6 +290,50 @@ class Track:
 | 
				
			||||||
            t.predictions,
 | 
					            t.predictions,
 | 
				
			||||||
            t.fps/step_size)
 | 
					            t.fps/step_size)
 | 
				
			||||||
    
 | 
					    
 | 
				
			||||||
 | 
					    def get_binned(self, bin_size=.5, remove_overlap=True):
 | 
				
			||||||
 | 
					        """
 | 
				
			||||||
 | 
					        For an experiment: what if we predict using only concrete positions, by mapping 
 | 
				
			||||||
 | 
					        dx,dy to a grid. Thus prediction can be for 8 moves, or rather headings
 | 
				
			||||||
 | 
					        see ~/notes/attachments example svg
 | 
				
			||||||
 | 
					        """
 | 
				
			||||||
 | 
					        
 | 
				
			||||||
 | 
					        new_history: List[Detection] = []
 | 
				
			||||||
 | 
					        for i, (det0, det1) in enumerate(zip(self.history[:-1], self.history[1:]):
 | 
				
			||||||
 | 
					            if i == 0:
 | 
				
			||||||
 | 
					                new_history.append(det0)
 | 
				
			||||||
 | 
					                continue
 | 
				
			||||||
 | 
					            if abs(det1.x - new_history[-1].x) < bin_size or abs(det1.y - new_history[-1].y) < bin_size:
 | 
				
			||||||
 | 
					                continue
 | 
				
			||||||
 | 
					            
 | 
				
			||||||
 | 
					            # det1 falls outside of the box [-bin_size:+bin_size] around last detection
 | 
				
			||||||
 | 
					            
 | 
				
			||||||
 | 
					            # 1. Interpolate exact point between det0 and det1 that this happens
 | 
				
			||||||
 | 
					            if abs(det1.x - new_history[-1].x) >= bin_size:
 | 
				
			||||||
 | 
					                if det1.x - new_history[-1].x >= bin_size:
 | 
				
			||||||
 | 
					                    # det1 left of last
 | 
				
			||||||
 | 
					                    x = new_history[-1].x + bin_size
 | 
				
			||||||
 | 
					                    f = inv_lerp(det0.x, det1.x, x)
 | 
				
			||||||
 | 
					                elif new_history[-1].x - det1.x >= bin_size:
 | 
				
			||||||
 | 
					                    # det1 left of last
 | 
				
			||||||
 | 
					                    x = new_history[-1].x - bin_size
 | 
				
			||||||
 | 
					                    f = inv_lerp(det0.x, det1.x, x)
 | 
				
			||||||
 | 
					                y = lerp(det0.y, det1.y, f)
 | 
				
			||||||
 | 
					            if abs(det1.y - new_history[-1].y) >= bin_size:
 | 
				
			||||||
 | 
					                if det1.y - new_history[-1].y >= bin_size:
 | 
				
			||||||
 | 
					                    # det1 left of last
 | 
				
			||||||
 | 
					                    y = new_history[-1].y + bin_size
 | 
				
			||||||
 | 
					                    f = inv_lerp(det0.y, det1.y, x)
 | 
				
			||||||
 | 
					                elif new_history[-1].y - det1.y >= bin_size:
 | 
				
			||||||
 | 
					                    # det1 left of last
 | 
				
			||||||
 | 
					                    y = new_history[-1].y - bin_size
 | 
				
			||||||
 | 
					                    f = inv_lerp(det0.y, det1.y, x)
 | 
				
			||||||
 | 
					                x = lerp(det0.x, det1.x, f)
 | 
				
			||||||
 | 
					                
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            # 2. Find closest point on rectangle (rectangle's four corners, or 4 midpoints)
 | 
				
			||||||
 | 
					            points = [[bin_size, 0], [bin_size, bin_size], [0, bin_size], [-bin_size, bin_size], [-bin_size, 0], [-bin_size, -bin_size], [0, -bin_size], [bin_size, -bin_size]]
 | 
				
			||||||
 | 
					            # todo Offsets to points:[ history for in points]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def to_trajectron_node(self, camera: Camera, env: Environment) -> Node:
 | 
					    def to_trajectron_node(self, camera: Camera, env: Environment) -> Node:
 | 
				
			||||||
        positions = self.get_projected_history(None, camera)
 | 
					        positions = self.get_projected_history(None, camera)
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -171,7 +171,7 @@ class PredictionServer:
 | 
				
			||||||
            self.prediction_socket.send_pyobj(frame)
 | 
					            self.prediction_socket.send_pyobj(frame)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def run(self):
 | 
					    def run(self):
 | 
				
			||||||
 | 
					        print(self.config)
 | 
				
			||||||
        if self.config.seed is not None:
 | 
					        if self.config.seed is not None:
 | 
				
			||||||
            random.seed(self.config.seed)
 | 
					            random.seed(self.config.seed)
 | 
				
			||||||
            np.random.seed(self.config.seed)
 | 
					            np.random.seed(self.config.seed)
 | 
				
			||||||
| 
						 | 
					@ -208,18 +208,9 @@ class PredictionServer:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        logger.info(f"Use hyperparams: {hyperparams=}")
 | 
					        logger.info(f"Use hyperparams: {hyperparams=}")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        output_save_dir = os.path.join(self.config.output_dir, 'pred_figs')
 | 
					 | 
				
			||||||
        pathlib.Path(output_save_dir).mkdir(parents=True, exist_ok=True)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        with open(self.config.eval_data_dict, 'rb') as f:
 | 
					        with open(self.config.eval_data_dict, 'rb') as f:
 | 
				
			||||||
            eval_env = dill.load(f, encoding='latin1')
 | 
					            eval_env = dill.load(f, encoding='latin1')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        if eval_env.robot_type is None and hyperparams['incl_robot_node']:
 | 
					 | 
				
			||||||
            eval_env.robot_type = eval_env.NodeType[0]  # TODO: Make more general, allow the user to specify?
 | 
					 | 
				
			||||||
            for scene in eval_env.scenes:
 | 
					 | 
				
			||||||
                scene.add_robot_from_nodes(eval_env.robot_type)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        logger.info('Loaded data from %s' % (self.config.eval_data_dict,))
 | 
					        logger.info('Loaded data from %s' % (self.config.eval_data_dict,))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # Creating a dummy environment with a single scene that contains information about the world.
 | 
					        # Creating a dummy environment with a single scene that contains information about the world.
 | 
				
			||||||
| 
						 | 
					@ -237,6 +228,7 @@ class PredictionServer:
 | 
				
			||||||
        model_registrar = ModelRegistrar(self.config.model_dir, self.config.eval_device)
 | 
					        model_registrar = ModelRegistrar(self.config.model_dir, self.config.eval_device)
 | 
				
			||||||
        model_iterations = pathlib.Path(self.config.model_dir).glob('model_registrar-*.pt')
 | 
					        model_iterations = pathlib.Path(self.config.model_dir).glob('model_registrar-*.pt')
 | 
				
			||||||
        highest_iter = max([int(p.stem.split('-')[-1]) for p in model_iterations])
 | 
					        highest_iter = max([int(p.stem.split('-')[-1]) for p in model_iterations])
 | 
				
			||||||
 | 
					        logger.info(f"Loading model {highest_iter}")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        model_registrar.load_models(iter_num=highest_iter)
 | 
					        model_registrar.load_models(iter_num=highest_iter)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -429,8 +421,8 @@ class PredictionServer:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            # if self.config.center_data:
 | 
					            # if self.config.center_data:
 | 
				
			||||||
            #     prediction_dict, histories_dict, futures_dict = offset_trajectron_dict(prediction_dict, cx, cy), offset_trajectron_dict(histories_dict, cx, cy), offset_trajectron_dict(futures_dict, cx, cy)
 | 
					            #     prediction_dict, histories_dict, futures_dict = offset_trajectron_dict(prediction_dict, cx, cy), offset_trajectron_dict(histories_dict, cx, cy), offset_trajectron_dict(futures_dict, cx, cy)
 | 
				
			||||||
            print('pred timesteps', list(prediction_dict.keys()))
 | 
					            # print('pred timesteps', list(prediction_dict.keys()))
 | 
				
			||||||
            print('histories', [n.data.data.shape[0] for n in prediction_dict[frame.index].keys()])
 | 
					            # print('histories', [n.data.data.shape[0] for n in prediction_dict[frame.index].keys()])
 | 
				
			||||||
            if self.config.cm_to_m:
 | 
					            if self.config.cm_to_m:
 | 
				
			||||||
                # convert back to fit homography
 | 
					                # convert back to fit homography
 | 
				
			||||||
                prediction_dict, histories_dict, futures_dict = prediction_m_to_cm(prediction_dict), prediction_m_to_cm(histories_dict), prediction_m_to_cm(futures_dict)
 | 
					                prediction_dict, histories_dict, futures_dict = prediction_m_to_cm(prediction_dict), prediction_m_to_cm(histories_dict), prediction_m_to_cm(futures_dict)
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -1,6 +1,7 @@
 | 
				
			||||||
from collections import defaultdict
 | 
					from collections import defaultdict
 | 
				
			||||||
import datetime
 | 
					import datetime
 | 
				
			||||||
from pathlib import Path
 | 
					from pathlib import Path
 | 
				
			||||||
 | 
					from random import shuffle
 | 
				
			||||||
import sys
 | 
					import sys
 | 
				
			||||||
import os
 | 
					import os
 | 
				
			||||||
import time
 | 
					import time
 | 
				
			||||||
| 
						 | 
					@ -14,7 +15,7 @@ from typing import List
 | 
				
			||||||
 | 
					
 | 
				
			||||||
from trap.config import CameraAction, HomographyAction
 | 
					from trap.config import CameraAction, HomographyAction
 | 
				
			||||||
from trap.frame_emitter import Camera
 | 
					from trap.frame_emitter import Camera
 | 
				
			||||||
from trap.tracker import Smoother, TrackReader
 | 
					from trap.tracker import FinalDisplacementFilter, Smoother, TrackReader
 | 
				
			||||||
 | 
					
 | 
				
			||||||
#sys.path.append("../../")
 | 
					#sys.path.append("../../")
 | 
				
			||||||
from trajectron.environment import Environment, Scene, Node
 | 
					from trajectron.environment import Environment, Scene, Node
 | 
				
			||||||
| 
						 | 
					@ -28,7 +29,7 @@ state_dim = 6
 | 
				
			||||||
frame_diff = 10
 | 
					frame_diff = 10
 | 
				
			||||||
desired_frame_diff = 1
 | 
					desired_frame_diff = 1
 | 
				
			||||||
dt = 1/FPS # dt per frame (e.g. 1/FPS)
 | 
					dt = 1/FPS # dt per frame (e.g. 1/FPS)
 | 
				
			||||||
smooth_window = FPS * 1.5 # see also tracker.py
 | 
					smooth_window = FPS # see also tracker.py
 | 
				
			||||||
min_track_length = 20
 | 
					min_track_length = 20
 | 
				
			||||||
 | 
					
 | 
				
			||||||
standardization = {
 | 
					standardization = {
 | 
				
			||||||
| 
						 | 
					@ -80,7 +81,7 @@ class TrackIteration:
 | 
				
			||||||
# maybe_makedirs('trajectron-data')
 | 
					# maybe_makedirs('trajectron-data')
 | 
				
			||||||
# for desired_source in [ 'hof2', ]:#  ,'hof-maskrcnn', 'hof-yolov8', 'VIRAT-0102-parsed', 'virat-resnet-keypoints-full']:
 | 
					# for desired_source in [ 'hof2', ]:#  ,'hof-maskrcnn', 'hof-yolov8', 'VIRAT-0102-parsed', 'virat-resnet-keypoints-full']:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def process_data(src_dir: Path, dst_dir: Path, name: str, smooth_tracks: bool, cm_to_m: bool, center_data: bool, bin_positions: bool, camera: Camera, step_size: int):
 | 
					def process_data(src_dir: Path, dst_dir: Path, name: str, smooth_tracks: bool, cm_to_m: bool, center_data: bool, bin_positions: bool, camera: Camera, step_size: int, filter_displacement:float):
 | 
				
			||||||
    name += f"-{datetime.date.today()}"
 | 
					    name += f"-{datetime.date.today()}"
 | 
				
			||||||
    print(f"Process data in {src_dir}, to {dst_dir}, identified by {name}")
 | 
					    print(f"Process data in {src_dir}, to {dst_dir}, identified by {name}")
 | 
				
			||||||
    
 | 
					    
 | 
				
			||||||
| 
						 | 
					@ -90,11 +91,15 @@ def process_data(src_dir: Path, dst_dir: Path, name: str, smooth_tracks: bool, c
 | 
				
			||||||
    skipped_for_error = 0
 | 
					    skipped_for_error = 0
 | 
				
			||||||
    created = 0
 | 
					    created = 0
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    smoother = Smoother(window_len=smooth_window, convolution=False) if smooth_tracks else None
 | 
					    smoother = Smoother(window_len=smooth_window, convolution=True) if smooth_tracks else None
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    reader = TrackReader(src_dir, camera.fps)
 | 
					    reader = TrackReader(src_dir, camera.fps)
 | 
				
			||||||
 | 
					    tracks = [t for t in reader]
 | 
				
			||||||
 | 
					    if filter_displacement > 0:
 | 
				
			||||||
 | 
					        filter = FinalDisplacementFilter(filter_displacement)
 | 
				
			||||||
 | 
					        tracks = filter.apply(tracks, camera)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    total = len(reader)
 | 
					    total = len(tracks)
 | 
				
			||||||
    bar = tqdm.tqdm(total=total)
 | 
					    bar = tqdm.tqdm(total=total)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    destinations = {
 | 
					    destinations = {
 | 
				
			||||||
| 
						 | 
					@ -108,13 +113,21 @@ def process_data(src_dir: Path, dst_dir: Path, name: str, smooth_tracks: bool, c
 | 
				
			||||||
    print(max_frame_nr)
 | 
					    print(max_frame_nr)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    # separate call so cursor is kept during multiple loops
 | 
					    # separate call so cursor is kept during multiple loops
 | 
				
			||||||
    track_iterator = iter(reader)
 | 
					    shuffle(tracks)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    dt1 = RollingAverage()
 | 
					    dt1 = RollingAverage()
 | 
				
			||||||
    dt2 = RollingAverage()
 | 
					    dt2 = RollingAverage()
 | 
				
			||||||
    dt3 = RollingAverage()
 | 
					    dt3 = RollingAverage()
 | 
				
			||||||
    dt4 = RollingAverage()
 | 
					    dt4 = RollingAverage()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    sets = {}
 | 
				
			||||||
 | 
					    offset = 0
 | 
				
			||||||
 | 
					    for data_class, nr in destinations.items():
 | 
				
			||||||
 | 
					        # TODO)) think of a way to shuffle while keeping scenes
 | 
				
			||||||
 | 
					        sets[data_class] = tracks[offset : offset+nr]
 | 
				
			||||||
 | 
					        offset += nr
 | 
				
			||||||
 | 
					    
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    print(f"Camera FPS: {camera.fps}, actual fps: {camera.fps/step_size} (or {(1/camera.fps)*step_size})")
 | 
					    print(f"Camera FPS: {camera.fps}, actual fps: {camera.fps/step_size} (or {(1/camera.fps)*step_size})")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    for data_class, nr_of_items in destinations.items():
 | 
					    for data_class, nr_of_items in destinations.items():
 | 
				
			||||||
| 
						 | 
					@ -135,7 +148,7 @@ def process_data(src_dir: Path, dst_dir: Path, name: str, smooth_tracks: bool, c
 | 
				
			||||||
        scene_nodes = defaultdict(lambda: [])
 | 
					        scene_nodes = defaultdict(lambda: [])
 | 
				
			||||||
        iterations = TrackIteration.iteration_variations(smooth_tracks, False, step_size)
 | 
					        iterations = TrackIteration.iteration_variations(smooth_tracks, False, step_size)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        for i, track in zip(range(nr_of_items), track_iterator):
 | 
					        for i, track in enumerate(sets[data_class]):
 | 
				
			||||||
                bar.update()
 | 
					                bar.update()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                track_source = track.source
 | 
					                track_source = track.source
 | 
				
			||||||
| 
						 | 
					@ -179,7 +192,7 @@ def process_data(src_dir: Path, dst_dir: Path, name: str, smooth_tracks: bool, c
 | 
				
			||||||
                    # track.get_projected_history(H=None, camera=self.config.camera)
 | 
					                    # track.get_projected_history(H=None, camera=self.config.camera)
 | 
				
			||||||
                    node = track.to_trajectron_node(camera, env)
 | 
					                    node = track.to_trajectron_node(camera, env)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                    d = time.time()
 | 
					                    data_class = time.time()
 | 
				
			||||||
                    
 | 
					                    
 | 
				
			||||||
                    # if center_data:
 | 
					                    # if center_data:
 | 
				
			||||||
                    #     data['pos_x'] -= cx
 | 
					                    #     data['pos_x'] -= cx
 | 
				
			||||||
| 
						 | 
					@ -198,13 +211,22 @@ def process_data(src_dir: Path, dst_dir: Path, name: str, smooth_tracks: bool, c
 | 
				
			||||||
                    
 | 
					                    
 | 
				
			||||||
                    dt1.add(b-a)
 | 
					                    dt1.add(b-a)
 | 
				
			||||||
                    dt2.add(c-b)
 | 
					                    dt2.add(c-b)
 | 
				
			||||||
                    dt3.add(d-c)
 | 
					                    dt3.add(data_class-c)
 | 
				
			||||||
                    dt4.add(e-d)
 | 
					                    dt4.add(e-data_class)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        for scene_nr, nodes in scene_nodes.items():            
 | 
					        for scene_nr, nodes in scene_nodes.items():            
 | 
				
			||||||
            scene = Scene(timesteps=nodes[-1].last_timestep, dt=(1/camera.fps)*step_size, name=f'{split_id}_{scene_nr}', aug_func=None)
 | 
					            first_ts = min([n.first_timestep for n in nodes])
 | 
				
			||||||
 | 
					            for node in nodes:
 | 
				
			||||||
 | 
					                node.first_timestep -= (first_ts - 1)
 | 
				
			||||||
 | 
					            last_ts = max([n.last_timestep for n in nodes])
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            # print(sorted([n.first_timestep for n in nodes]))
 | 
				
			||||||
 | 
					            scene = Scene(timesteps=last_ts, dt=(1/camera.fps)*step_size, name=f'{split_id}_{scene_nr}', aug_func=None)
 | 
				
			||||||
            scene.nodes.extend(nodes)
 | 
					            scene.nodes.extend(nodes)
 | 
				
			||||||
            scenes.append(scene)
 | 
					            scenes.append(scene)
 | 
				
			||||||
 | 
					            # print(scene)
 | 
				
			||||||
 | 
					            
 | 
				
			||||||
 | 
					            # print(scene.nodes[0].first_timestep)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        print(f'Processed {len(scenes):.2f} scene for data class {data_class}')
 | 
					        print(f'Processed {len(scenes):.2f} scene for data class {data_class}')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -244,6 +266,11 @@ def main():
 | 
				
			||||||
                    # type=Path,
 | 
					                    # type=Path,
 | 
				
			||||||
                    default=None,
 | 
					                    default=None,
 | 
				
			||||||
                    action=CameraAction)
 | 
					                    action=CameraAction)
 | 
				
			||||||
 | 
					    parser.add_argument("--filter-displacement",
 | 
				
			||||||
 | 
					                    help="Filter tracks with a final displacement less then the given value",
 | 
				
			||||||
 | 
					                    # type=Path,
 | 
				
			||||||
 | 
					                    default=0,
 | 
				
			||||||
 | 
					                    type=float)
 | 
				
			||||||
    
 | 
					    
 | 
				
			||||||
    args = parser.parse_args()
 | 
					    args = parser.parse_args()
 | 
				
			||||||
    
 | 
					    
 | 
				
			||||||
| 
						 | 
					@ -257,6 +284,7 @@ def main():
 | 
				
			||||||
        args.center_data,
 | 
					        args.center_data,
 | 
				
			||||||
        args.bin_positions,
 | 
					        args.bin_positions,
 | 
				
			||||||
        args.camera,
 | 
					        args.camera,
 | 
				
			||||||
        args.step_size
 | 
					        args.step_size,
 | 
				
			||||||
 | 
					        filter_displacement=args.filter_displacement
 | 
				
			||||||
    )
 | 
					    )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -25,7 +25,7 @@ from deep_sort_realtime.deep_sort.track import Track as DeepsortTrack
 | 
				
			||||||
from ultralytics import YOLO
 | 
					from ultralytics import YOLO
 | 
				
			||||||
from ultralytics.engine.results import Results as YOLOResult
 | 
					from ultralytics.engine.results import Results as YOLOResult
 | 
				
			||||||
 | 
					
 | 
				
			||||||
from trap.frame_emitter import DetectionState, Frame, Detection, Track
 | 
					from trap.frame_emitter import Camera, DataclassJSONEncoder, DetectionState, Frame, Detection, Track
 | 
				
			||||||
from bytetracker import BYTETracker
 | 
					from bytetracker import BYTETracker
 | 
				
			||||||
 | 
					
 | 
				
			||||||
from tsmoothie.smoother import KalmanSmoother, ConvolutionSmoother
 | 
					from tsmoothie.smoother import KalmanSmoother, ConvolutionSmoother
 | 
				
			||||||
| 
						 | 
					@ -93,14 +93,33 @@ class Multifile():
 | 
				
			||||||
 | 
					
 | 
				
			||||||
FIELDNAMES = ['frame_id', 'track_id', 'l', 't', 'w', 'h', 'x', 'y', 'state', 'source']
 | 
					FIELDNAMES = ['frame_id', 'track_id', 'l', 't', 'w', 'h', 'x', 'y', 'state', 'source']
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					class TrackFilter:
 | 
				
			||||||
 | 
					    pass
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def apply(self, tracks: List[Track], camera: Camera):
 | 
				
			||||||
 | 
					        return [t for t in tracks if self.filter(t, camera)]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					class FinalDisplacementFilter(TrackFilter):
 | 
				
			||||||
 | 
					    def __init__(self, min_displacement):
 | 
				
			||||||
 | 
					        self.min_displacement = min_displacement
 | 
				
			||||||
 | 
					    
 | 
				
			||||||
 | 
					    def filter(self, track: Track, camera: Camera):
 | 
				
			||||||
 | 
					        history = track.get_projected_history(H=None, camera=camera)
 | 
				
			||||||
 | 
					        displacement = np.linalg.norm(history[0]-history[-1])
 | 
				
			||||||
 | 
					        return displacement > self.min_displacement
 | 
				
			||||||
 | 
					
 | 
				
			||||||
class TrackReader:
 | 
					class TrackReader:
 | 
				
			||||||
    def __init__(self, path: Path, fps: int, include_blacklisted = False, exclude_whitelisted = False):
 | 
					    def __init__(self, path: Path, fps: int, include_blacklisted = False, exclude_whitelisted = False):
 | 
				
			||||||
        self.blacklist_file = path / "blacklist.jsonl"
 | 
					        self.blacklist_file = path / "blacklist.jsonl"
 | 
				
			||||||
        self.whitelist_file = path / "whitelist.jsonl" # for skipping
 | 
					        self.whitelist_file = path / "whitelist.jsonl" # for skipping
 | 
				
			||||||
        self.tracks_file = path / "tracks.json"
 | 
					        self.tracks_file = path / "tracks.pkl"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # with self.tracks_file.open('r') as fp:
 | 
				
			||||||
 | 
					        #     tracks_dict: dict = json.load(fp)
 | 
				
			||||||
 | 
					            
 | 
				
			||||||
 | 
					        with self.tracks_file.open('rb') as fp:
 | 
				
			||||||
 | 
					            tracks: dict = pickle.load(fp)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        with self.tracks_file.open('r') as fp:
 | 
					 | 
				
			||||||
            tracks_dict: dict = json.load(fp)
 | 
					 | 
				
			||||||
        
 | 
					        
 | 
				
			||||||
        if self.blacklist_file.exists():
 | 
					        if self.blacklist_file.exists():
 | 
				
			||||||
            with jsonlines.open(self.blacklist_file, 'r') as reader:
 | 
					            with jsonlines.open(self.blacklist_file, 'r') as reader:
 | 
				
			||||||
| 
						 | 
					@ -117,7 +136,7 @@ class TrackReader:
 | 
				
			||||||
            
 | 
					            
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        self._tracks = { track_id: detection_values
 | 
					        self._tracks = { track_id: detection_values
 | 
				
			||||||
            for track_id, detection_values in tracks_dict.items()
 | 
					            for track_id, detection_values in tracks.items()
 | 
				
			||||||
            if (include_blacklisted or track_id not in blacklist) and
 | 
					            if (include_blacklisted or track_id not in blacklist) and
 | 
				
			||||||
                (not exclude_whitelisted or track_id not in whitelist)
 | 
					                (not exclude_whitelisted or track_id not in whitelist)
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
| 
						 | 
					@ -127,26 +146,27 @@ class TrackReader:
 | 
				
			||||||
        return len(self._tracks)
 | 
					        return len(self._tracks)
 | 
				
			||||||
    
 | 
					    
 | 
				
			||||||
    def get(self, track_id):
 | 
					    def get(self, track_id):
 | 
				
			||||||
        detection_values = self._tracks[track_id]
 | 
					        return self._tracks[track_id]
 | 
				
			||||||
        history = []
 | 
					        # detection_values = self._tracks[track_id]
 | 
				
			||||||
        # for detection_values in 
 | 
					        # history = []
 | 
				
			||||||
        source = None
 | 
					        # # for detection_values in 
 | 
				
			||||||
        for detection_items in detection_values:
 | 
					        # source = None
 | 
				
			||||||
            d = dict(zip(FIELDNAMES, detection_items))
 | 
					        # for detection_items in detection_values:
 | 
				
			||||||
            history.append(Detection(
 | 
					        #     d = dict(zip(FIELDNAMES, detection_items))
 | 
				
			||||||
                d['track_id'],
 | 
					        #     history.append(Detection(
 | 
				
			||||||
                d['l'],
 | 
					        #         d['track_id'],
 | 
				
			||||||
                d['t'],
 | 
					        #         d['l'],
 | 
				
			||||||
                d['w'],
 | 
					        #         d['t'],
 | 
				
			||||||
                d['h'],
 | 
					        #         d['w'],
 | 
				
			||||||
                nan,
 | 
					        #         d['h'],
 | 
				
			||||||
                d['state'],
 | 
					        #         nan,
 | 
				
			||||||
                d['frame_id'],
 | 
					        #         d['state'],
 | 
				
			||||||
                1,
 | 
					        #         d['frame_id'],
 | 
				
			||||||
            ))
 | 
					        #         1,
 | 
				
			||||||
            source = int(d['source'])
 | 
					        #     ))
 | 
				
			||||||
 | 
					        #     source = int(d['source'])
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        return Track(track_id, history, fps=self.fps, source=source)
 | 
					        # return Track(track_id, history, fps=self.fps, source=source)
 | 
				
			||||||
            
 | 
					            
 | 
				
			||||||
    def __iter__(self):
 | 
					    def __iter__(self):
 | 
				
			||||||
        for track_id in self._tracks:
 | 
					        for track_id in self._tracks:
 | 
				
			||||||
| 
						 | 
					@ -239,7 +259,8 @@ def rewrite_raw_track_files(path: Path):
 | 
				
			||||||
    # for source_file in source_files:
 | 
					    # for source_file in source_files:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    tracks_file = path / 'tracks.json'
 | 
					    tracks_file = path / 'tracks.json'
 | 
				
			||||||
    tracks = defaultdict(lambda: [])
 | 
					    tracks_pkl = path / 'tracks.pkl'
 | 
				
			||||||
 | 
					    tracks = defaultdict(lambda: Track())
 | 
				
			||||||
    
 | 
					    
 | 
				
			||||||
    offset = 0
 | 
					    offset = 0
 | 
				
			||||||
    max_track_id = 0
 | 
					    max_track_id = 0
 | 
				
			||||||
| 
						 | 
					@ -285,18 +306,31 @@ def rewrite_raw_track_files(path: Path):
 | 
				
			||||||
                if track_id > max_track_id:
 | 
					                if track_id > max_track_id:
 | 
				
			||||||
                    max_track_id = track_id
 | 
					                    max_track_id = track_id
 | 
				
			||||||
                
 | 
					                
 | 
				
			||||||
                parts[1] = str(track_id)
 | 
					                track_id = str(track_id)
 | 
				
			||||||
                target_fp.write("\t".join(parts))
 | 
					                target_fp.write("\t".join(parts))
 | 
				
			||||||
                
 | 
					                
 | 
				
			||||||
                parts = [float(p) for p in parts]
 | 
					                parts = [float(p) for p in parts]
 | 
				
			||||||
                tracks[track_id].append([
 | 
					                #  ['frame_id', 'track_id', 'l', 't', 'w', 'h', 'x', 'y', 'state', 'source']
 | 
				
			||||||
                    int(parts[0] / 10),
 | 
					                
 | 
				
			||||||
                    track_id,
 | 
					                point = Detection(track_id, parts[2], parts[3], parts[4], parts[5], 1, DetectionState(int(parts[8])), int(parts[0]/10), 1)
 | 
				
			||||||
                    ] + parts[2:8] + [int(parts[8]), src_file_nr])
 | 
					                # history = [
 | 
				
			||||||
 | 
					                     
 | 
				
			||||||
 | 
					                #     for d in parts]
 | 
				
			||||||
 | 
					                tracks[track_id].track_id = track_id
 | 
				
			||||||
 | 
					                tracks[track_id].source = src_file_nr
 | 
				
			||||||
 | 
					                tracks[track_id].history.append(point)
 | 
				
			||||||
 | 
					                # tracks[track_id].append([
 | 
				
			||||||
 | 
					                #     int(parts[0] / 10),
 | 
				
			||||||
 | 
					                #     track_id,
 | 
				
			||||||
 | 
					                #     ] + parts[2:8] + [int(parts[8]), src_file_nr])
 | 
				
			||||||
    
 | 
					    
 | 
				
			||||||
    with tracks_file.open('w') as fp:
 | 
					    with tracks_file.open('w') as fp:
 | 
				
			||||||
        logger.info(f"Write {len(tracks)} tracks to {str(tracks_file)}")
 | 
					        logger.info(f"Write {len(tracks)} tracks to {str(tracks_file)}")
 | 
				
			||||||
        json.dump(tracks, fp)
 | 
					        json.dump(tracks, fp, cls=DataclassJSONEncoder, indent=2)
 | 
				
			||||||
 | 
					    with tracks_pkl.open('wb') as fp:
 | 
				
			||||||
 | 
					        logger.info(f"Write {len(tracks)} tracks to {str(tracks_pkl)}")
 | 
				
			||||||
 | 
					        pickle.dump(dict(tracks), fp)
 | 
				
			||||||
 | 
					    
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
class TrackerWrapper():
 | 
					class TrackerWrapper():
 | 
				
			||||||
| 
						 | 
					@ -641,7 +675,7 @@ class Smoother:
 | 
				
			||||||
    def __init__(self, window_len=6, convolution=False):
 | 
					    def __init__(self, window_len=6, convolution=False):
 | 
				
			||||||
        # for some reason this smoother messes the predictions. Probably skews the points too much??
 | 
					        # for some reason this smoother messes the predictions. Probably skews the points too much??
 | 
				
			||||||
        if convolution:
 | 
					        if convolution:
 | 
				
			||||||
            self.smoother = ConvolutionSmoother(window_len=window_len, window_type='ones', copy=None)
 | 
					            self.smoother = ConvolutionSmoother(window_len=window_len, window_type='hanning', copy=None)
 | 
				
			||||||
        else:
 | 
					        else:
 | 
				
			||||||
            # "Unlike Kalman filtering, which focuses on predicting and updating the current state using historical measurements, Kalman smoothing enhances the accuracy of past state values"
 | 
					            # "Unlike Kalman filtering, which focuses on predicting and updating the current state using historical measurements, Kalman smoothing enhances the accuracy of past state values"
 | 
				
			||||||
            # see https://medium.com/@shahalkp1/kalman-smoothing-using-tsmoothie-0175260464e5
 | 
					            # see https://medium.com/@shahalkp1/kalman-smoothing-using-tsmoothie-0175260464e5
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -1,3 +1,4 @@
 | 
				
			||||||
 | 
					# lerp & inverse lerp from https://gist.github.com/laundmo/b224b1f4c8ef6ca5fe47e132c8deab56
 | 
				
			||||||
def lerp(a: float, b: float, t: float) -> float:
 | 
					def lerp(a: float, b: float, t: float) -> float:
 | 
				
			||||||
    """Linear interpolate on the scale given by a to b, using t as the point on that scale.
 | 
					    """Linear interpolate on the scale given by a to b, using t as the point on that scale.
 | 
				
			||||||
    Examples
 | 
					    Examples
 | 
				
			||||||
| 
						 | 
					@ -6,3 +7,13 @@ def lerp(a: float, b: float, t: float) -> float:
 | 
				
			||||||
        4.2 == lerp(1, 5, 0.8)
 | 
					        4.2 == lerp(1, 5, 0.8)
 | 
				
			||||||
    """
 | 
					    """
 | 
				
			||||||
    return (1 - t) * a + t * b
 | 
					    return (1 - t) * a + t * b
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def inv_lerp(a: float, b: float, v: float) -> float:
 | 
				
			||||||
 | 
					    """Inverse Linar Interpolation, get the fraction between a and b on which v resides.
 | 
				
			||||||
 | 
					    Examples
 | 
				
			||||||
 | 
					    --------
 | 
				
			||||||
 | 
					        0.5 == inv_lerp(0, 100, 50)
 | 
				
			||||||
 | 
					        0.8 == inv_lerp(1, 5, 4.2)
 | 
				
			||||||
 | 
					    """
 | 
				
			||||||
 | 
					    return (v - a) / (b - a)
 | 
				
			||||||
		Loading…
	
		Reference in a new issue