trajpred/predict_path.ipynb
2023-05-08 20:59:35 +02:00

701 KiB

In [4]:
import cv2
from pathlib import Path
import numpy as np
# from PIL import Image
import torch
from torchvision.io.video import read_video
import matplotlib.pyplot as plt
from torchvision.utils import draw_bounding_boxes
from torchvision.transforms.functional import to_pil_image
from torchvision.models.detection import retinanet_resnet50_fpn_v2, RetinaNet_ResNet50_FPN_V2_Weights
In [5]:
source = Path('../DATASETS/VIRAT_subset_0102x')
videos = source.glob('*.mp4')
homography = list(source.glob('*img2world.txt'))[0]
H = np.loadtxt(homography, delimiter=',')

The homography matrix helps to transform points from image space to a flat world plane. The README_homography.txt from VIRAT describes:

Roughly estimated 3-by-3 homographies are included for convenience. Each homography H provides a mapping from image coordinate to scene-dependent world coordinate.
[xw,yw,zw]' = H*[xi,yi,1]'

xi: horizontal axis on image with left top corner as origin, increases right. yi: vertical axis on image with left top corner as origin, increases downward.

xw/zw: world x coordinate yw/zw: world y coordiante

In [6]:
# H.dot(np.array([20,300, 1]))
In [7]:
video_path = list(videos)[0]
video_path = Path("../DATASETS/VIRAT_subset_0102x/VIRAT_S_010200_00_000060_000218.mp4")
In [8]:
video_path
Out[8]:
PosixPath('../DATASETS/VIRAT_subset_0102x/VIRAT_S_010200_00_000060_000218.mp4')
In [9]:
weights = RetinaNet_ResNet50_FPN_V2_Weights.DEFAULT
model = retinanet_resnet50_fpn_v2(weights=weights, score_thresh=0.35)
# Put the model in inference mode
model.eval()
# Get the transforms for the model's weights
preprocess = weights.transforms()
In [10]:
# hub.set_dir()
In [11]:
video = cv2.VideoCapture(str(video_path))

The score_thresh argument defines the threshold at which an object is detected as an object of a class. Intuitively, it's the confidence threshold, and we won't classify an object to belong to a class if the model is less than 35% confident that it belongs to a class.

The result from a single prediction coming from model(batch) looks like:

{'boxes': tensor([[5.7001e+02, 2.5786e+02, 6.3138e+02, 3.6970e+02],
         [5.0109e+02, 2.4508e+02, 5.5308e+02, 3.4852e+02],
         [3.4096e+02, 2.7015e+02, 3.6156e+02, 3.1857e+02],
         [5.0219e-01, 3.7588e+02, 9.7911e+01, 7.2000e+02],
         [3.4096e+02, 2.7015e+02, 3.6156e+02, 3.1857e+02],
         [8.3241e+01, 5.8410e+02, 1.7502e+02, 7.1743e+02]]),
 'scores': tensor([0.8525, 0.6491, 0.5985, 0.4999, 0.3753, 0.3746]),
 'labels': tensor([64, 64,  1, 64, 18, 86])}
In [37]:
# TODO make into loop
%matplotlib inline


import pylab as pl
from IPython import display

i=0
while True:
    ret, frame = video.read()
    i+=1
    
    if not ret:
        print("Can't receive frame (stream end?). Exiting ...")
        break

    t = torch.from_numpy(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
    t.shape
    # image = image[np.newaxis, :] 
    t = t.permute(2, 0, 1)
    t.shape

    batch = [preprocess(t)]
    # no_grad can be used on inference, should be slightly faster
    with torch.no_grad():
        predictions = model(batch)
    prediction = predictions[0] # we feed only one frame at the once

    mask = prediction['labels'] == 1 # if we want more than one: np.isin(prediction['labels'], [1,86])

    scores = prediction['scores'][mask]
    labels = prediction['labels'][mask]
    boxes = prediction['boxes'][mask]
    
    # TODO: introduce confidence and NMS supression: https://github.com/cfotache/pytorch_objectdetecttrack/blob/master/PyTorch_Object_Tracking.ipynb
    # (which I _think_ we better do after filtering)
    # alternatively look at Soft-NMS https://towardsdatascience.com/non-maximum-suppression-nms-93ce178e177c

    labels = [weights.meta["categories"][i] for i in labels]

    box = draw_bounding_boxes(t, boxes=boxes,
                            labels=labels,
                            colors="cyan",
                            width=2, 
                            font_size=30,
                            font='Arial')

    im = to_pil_image(box.detach())

    display.display(im, f"frame {i}")
    print(prediction)
    display.clear_output(wait=True)

    break # for now
    # pl.clf()
    # # pl.plot(pl.randn(100))
    # pl.figure(figsize=(24,50))
    # # fig.axes[0].imshow(img)
    # pl.imshow(im)
    # display.display(pl.gcf(), f"frame {i}")
    # display.clear_output(wait=True)
    # time.sleep(1.0)

    # fig, ax = plt.subplots(figsize=(16, 12))
    # ax.imshow(im)
    # plt.show()
'frame 1'
{'boxes': tensor([[5.6998e+02, 2.5778e+02, 6.3132e+02, 3.6969e+02],
        [5.0109e+02, 2.4507e+02, 5.5308e+02, 3.4848e+02],
        [5.4706e-01, 3.7548e+02, 9.8450e+01, 7.2000e+02],
        [3.4061e+02, 2.7014e+02, 3.6137e+02, 3.1858e+02],
        [3.4061e+02, 2.7014e+02, 3.6137e+02, 3.1858e+02],
        [8.3206e+01, 5.8410e+02, 1.7512e+02, 7.1747e+02]]), 'scores': tensor([0.8500, 0.6467, 0.4990, 0.4889, 0.4773, 0.3656]), 'labels': tensor([64, 64, 64, 18,  1, 86])}
In [36]:
prediction
Out[36]:
{'boxes': tensor([[5.7001e+02, 2.5786e+02, 6.3138e+02, 3.6970e+02],
         [5.0109e+02, 2.4508e+02, 5.5308e+02, 3.4852e+02],
         [3.4096e+02, 2.7015e+02, 3.6156e+02, 3.1857e+02],
         [5.0219e-01, 3.7588e+02, 9.7911e+01, 7.2000e+02],
         [3.4096e+02, 2.7015e+02, 3.6156e+02, 3.1857e+02],
         [8.3241e+01, 5.8410e+02, 1.7502e+02, 7.1743e+02]]),
 'scores': tensor([0.8525, 0.6491, 0.5985, 0.4999, 0.3753, 0.3746]),
 'labels': tensor([64, 64,  1, 64, 18, 86])}
In [21]:
prediction['labels'] == 1
Out[21]:
tensor([False, False,  True, False, False, False])
In [23]:
prediction['boxes'][prediction['labels'] == 1]
prediction['scores'][prediction['labels'] == 1]
Out[23]:
tensor([0.5985])
In [35]:

Out[35]:
tensor([[340.9556, 270.1501, 361.5573, 318.5745],
        [ 83.2414, 584.1043, 175.0199, 717.4326]])

Now with SORT tracking

Using a sort implementation originally by Alex Bewley, but adapted by Chris Fotache. For an example implementation, see his notebook.

In [39]:
from sort_cfotache import Sort

mot_tracker = Sort()
---------------------------------------------------------------------------
ModuleNotFoundError                       Traceback (most recent call last)
Cell In[39], line 1
----> 1 from sort_cfotache import Sort
      3 mot_tracker = Sort()

File ~/spul/Projecten/suspicion/trajpred/sort_cfotache.py:22
      1 """
      2     from: https://github.com/cfotache/pytorch_objectdetecttrack/blob/master/sort.py
      3     
   (...)
     18     along with this program.  If not, see <http://www.gnu.org/licenses/>.
     19 """
     20 from __future__ import print_function
---> 22 from numba import jit
     23 import os.path
     24 import numpy as np

ModuleNotFoundError: No module named 'numba'
In [38]:
# TODO make into loop
%matplotlib inline


import pylab as pl
from IPython import display

i=0
while True:
    ret, frame = video.read()
    i+=1
    
    if not ret:
        print("Can't receive frame (stream end?). Exiting ...")
        break

    t = torch.from_numpy(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
    t.shape
    # image = image[np.newaxis, :] 
    t = t.permute(2, 0, 1)
    t.shape

    batch = [preprocess(t)]
    # no_grad can be used on inference, should be slightly faster
    with torch.no_grad():
        predictions = model(batch)
    prediction = predictions[0] # we feed only one frame at the once

    mask = prediction['labels'] == 1 # if we want more than one: np.isin(prediction['labels'], [1,86])

    scores = prediction['scores'][mask]
    labels = prediction['labels'][mask]
    boxes = prediction['boxes'][mask]
    
    # TODO: introduce confidence and NMS supression: https://github.com/cfotache/pytorch_objectdetecttrack/blob/master/PyTorch_Object_Tracking.ipynb
    # (which I _think_ we better do after filtering)
    # alternatively look at Soft-NMS https://towardsdatascience.com/non-maximum-suppression-nms-93ce178e177c

    labels = [weights.meta["categories"][i] for i in labels]
    
    #  dets - a numpy array of detections in the format [[x1,y1,x2,y2,score],[x1,y1,x2,y2,score],...]
    detections = [np.append(bbox, score) for bbox, score in zip(boxes, scores)]
    tracks = mot_tracker.update(detections)

    # now convert back to boxes and labels
    boxes = [t[:4] for t in tracks]
    labels = [t[-1] for t in tracks]

    box = draw_bounding_boxes(t, boxes=boxes,
                            labels=labels,
                            colors="cyan",
                            width=2, 
                            font_size=30,
                            font='Arial')

    im = to_pil_image(box.detach())

    display.display(im, f"frame {i}")
    print(prediction)
    display.clear_output(wait=True)

    break # for now
    # pl.clf()
    # # pl.plot(pl.randn(100))
    # pl.figure(figsize=(24,50))
    # # fig.axes[0].imshow(img)
    # pl.imshow(im)
    # display.display(pl.gcf(), f"frame {i}")
    # display.clear_output(wait=True)
    # time.sleep(1.0)

    # fig, ax = plt.subplots(figsize=(16, 12))
    # ax.imshow(im)
    # plt.show()
---------------------------------------------------------------------------
KeyboardInterrupt                         Traceback (most recent call last)
Cell In[38], line 26
     24 # no_grad can be used on inference, should be slightly faster
     25 with torch.no_grad():
---> 26     predictions = model(batch)
     27 prediction = predictions[0] # we feed only one frame at the once
     29 mask = prediction['labels'] == 1 # if we want more than one: np.isin(prediction['labels'], [1,86])

File ~/spul/Projecten/suspicion/trajpred/.venv/lib/python3.11/site-packages/torch/nn/modules/module.py:1501, in Module._call_impl(self, *args, **kwargs)
   1496 # If we don't have any hooks, we want to skip the rest of the logic in
   1497 # this function, and just call forward.
   1498 if not (self._backward_hooks or self._backward_pre_hooks or self._forward_hooks or self._forward_pre_hooks
   1499         or _global_backward_pre_hooks or _global_backward_hooks
   1500         or _global_forward_hooks or _global_forward_pre_hooks):
-> 1501     return forward_call(*args, **kwargs)
   1502 # Do not call functions when jit is used
   1503 full_backward_hooks, non_full_backward_hooks = [], []

File ~/spul/Projecten/suspicion/trajpred/.venv/lib/python3.11/site-packages/torchvision/models/detection/retinanet.py:625, in RetinaNet.forward(self, images, targets)
    618             torch._assert(
    619                 False,
    620                 "All bounding boxes should have positive height and width."
    621                 f" Found invalid box {degen_bb} for target at index {target_idx}.",
    622             )
    624 # get the features from the backbone
--> 625 features = self.backbone(images.tensors)
    626 if isinstance(features, torch.Tensor):
    627     features = OrderedDict([("0", features)])

File ~/spul/Projecten/suspicion/trajpred/.venv/lib/python3.11/site-packages/torch/nn/modules/module.py:1501, in Module._call_impl(self, *args, **kwargs)
   1496 # If we don't have any hooks, we want to skip the rest of the logic in
   1497 # this function, and just call forward.
   1498 if not (self._backward_hooks or self._backward_pre_hooks or self._forward_hooks or self._forward_pre_hooks
   1499         or _global_backward_pre_hooks or _global_backward_hooks
   1500         or _global_forward_hooks or _global_forward_pre_hooks):
-> 1501     return forward_call(*args, **kwargs)
   1502 # Do not call functions when jit is used
   1503 full_backward_hooks, non_full_backward_hooks = [], []

File ~/spul/Projecten/suspicion/trajpred/.venv/lib/python3.11/site-packages/torchvision/models/detection/backbone_utils.py:57, in BackboneWithFPN.forward(self, x)
     56 def forward(self, x: Tensor) -> Dict[str, Tensor]:
---> 57     x = self.body(x)
     58     x = self.fpn(x)
     59     return x

File ~/spul/Projecten/suspicion/trajpred/.venv/lib/python3.11/site-packages/torch/nn/modules/module.py:1501, in Module._call_impl(self, *args, **kwargs)
   1496 # If we don't have any hooks, we want to skip the rest of the logic in
   1497 # this function, and just call forward.
   1498 if not (self._backward_hooks or self._backward_pre_hooks or self._forward_hooks or self._forward_pre_hooks
   1499         or _global_backward_pre_hooks or _global_backward_hooks
   1500         or _global_forward_hooks or _global_forward_pre_hooks):
-> 1501     return forward_call(*args, **kwargs)
   1502 # Do not call functions when jit is used
   1503 full_backward_hooks, non_full_backward_hooks = [], []

File ~/spul/Projecten/suspicion/trajpred/.venv/lib/python3.11/site-packages/torchvision/models/_utils.py:69, in IntermediateLayerGetter.forward(self, x)
     67 out = OrderedDict()
     68 for name, module in self.items():
---> 69     x = module(x)
     70     if name in self.return_layers:
     71         out_name = self.return_layers[name]

File ~/spul/Projecten/suspicion/trajpred/.venv/lib/python3.11/site-packages/torch/nn/modules/module.py:1501, in Module._call_impl(self, *args, **kwargs)
   1496 # If we don't have any hooks, we want to skip the rest of the logic in
   1497 # this function, and just call forward.
   1498 if not (self._backward_hooks or self._backward_pre_hooks or self._forward_hooks or self._forward_pre_hooks
   1499         or _global_backward_pre_hooks or _global_backward_hooks
   1500         or _global_forward_hooks or _global_forward_pre_hooks):
-> 1501     return forward_call(*args, **kwargs)
   1502 # Do not call functions when jit is used
   1503 full_backward_hooks, non_full_backward_hooks = [], []

File ~/spul/Projecten/suspicion/trajpred/.venv/lib/python3.11/site-packages/torch/nn/modules/container.py:217, in Sequential.forward(self, input)
    215 def forward(self, input):
    216     for module in self:
--> 217         input = module(input)
    218     return input

File ~/spul/Projecten/suspicion/trajpred/.venv/lib/python3.11/site-packages/torch/nn/modules/module.py:1501, in Module._call_impl(self, *args, **kwargs)
   1496 # If we don't have any hooks, we want to skip the rest of the logic in
   1497 # this function, and just call forward.
   1498 if not (self._backward_hooks or self._backward_pre_hooks or self._forward_hooks or self._forward_pre_hooks
   1499         or _global_backward_pre_hooks or _global_backward_hooks
   1500         or _global_forward_hooks or _global_forward_pre_hooks):
-> 1501     return forward_call(*args, **kwargs)
   1502 # Do not call functions when jit is used
   1503 full_backward_hooks, non_full_backward_hooks = [], []

File ~/spul/Projecten/suspicion/trajpred/.venv/lib/python3.11/site-packages/torchvision/models/resnet.py:150, in Bottleneck.forward(self, x)
    147 out = self.bn1(out)
    148 out = self.relu(out)
--> 150 out = self.conv2(out)
    151 out = self.bn2(out)
    152 out = self.relu(out)

File ~/spul/Projecten/suspicion/trajpred/.venv/lib/python3.11/site-packages/torch/nn/modules/module.py:1501, in Module._call_impl(self, *args, **kwargs)
   1496 # If we don't have any hooks, we want to skip the rest of the logic in
   1497 # this function, and just call forward.
   1498 if not (self._backward_hooks or self._backward_pre_hooks or self._forward_hooks or self._forward_pre_hooks
   1499         or _global_backward_pre_hooks or _global_backward_hooks
   1500         or _global_forward_hooks or _global_forward_pre_hooks):
-> 1501     return forward_call(*args, **kwargs)
   1502 # Do not call functions when jit is used
   1503 full_backward_hooks, non_full_backward_hooks = [], []

File ~/spul/Projecten/suspicion/trajpred/.venv/lib/python3.11/site-packages/torch/nn/modules/conv.py:463, in Conv2d.forward(self, input)
    462 def forward(self, input: Tensor) -> Tensor:
--> 463     return self._conv_forward(input, self.weight, self.bias)

File ~/spul/Projecten/suspicion/trajpred/.venv/lib/python3.11/site-packages/torch/nn/modules/conv.py:459, in Conv2d._conv_forward(self, input, weight, bias)
    455 if self.padding_mode != 'zeros':
    456     return F.conv2d(F.pad(input, self._reversed_padding_repeated_twice, mode=self.padding_mode),
    457                     weight, bias, self.stride,
    458                     _pair(0), self.dilation, self.groups)
--> 459 return F.conv2d(input, weight, bias, self.stride,
    460                 self.padding, self.dilation, self.groups)

KeyboardInterrupt: 
In [ ]: