|
|
|
@ -5,6 +5,7 @@ import logging
@@ -5,6 +5,7 @@ import logging
|
|
|
|
|
import argparse |
|
|
|
|
import numpy as np |
|
|
|
|
import time |
|
|
|
|
import datetime |
|
|
|
|
from PIL import ImageFont, ImageDraw, Image |
|
|
|
|
import os |
|
|
|
|
|
|
|
|
@ -34,17 +35,22 @@ class Result():
@@ -34,17 +35,22 @@ class Result():
|
|
|
|
|
return self |
|
|
|
|
|
|
|
|
|
def draw_detections(self): |
|
|
|
|
color = draw_colors[self.algorithm] |
|
|
|
|
cv2_im_rgb = cv2.cvtColor(self.visualisation,cv2.COLOR_BGR2RGB) |
|
|
|
|
# Pass the image to PIL |
|
|
|
|
pil_im = Image.fromarray(cv2_im_rgb) |
|
|
|
|
draw = ImageDraw.Draw(pil_im, 'RGBA') |
|
|
|
|
|
|
|
|
|
self.draw_detections_on(draw) |
|
|
|
|
return cv2.cvtColor(np.array(pil_im), cv2.COLOR_RGB2BGR) |
|
|
|
|
|
|
|
|
|
def draw_detections_on(self, draw: ImageDraw): |
|
|
|
|
''' |
|
|
|
|
Draw on a specified canvas |
|
|
|
|
''' |
|
|
|
|
color = draw_colors[self.algorithm] |
|
|
|
|
for detection in self.detections: |
|
|
|
|
self.draw_detection(draw, detection, color) |
|
|
|
|
|
|
|
|
|
self.visualisation = cv2.cvtColor(np.array(pil_im), cv2.COLOR_RGB2BGR) |
|
|
|
|
|
|
|
|
|
def draw_detection(self, draw: ImageDraw, detection: dict, color: tuple): |
|
|
|
|
width = 2 |
|
|
|
|
|
|
|
|
@ -68,51 +74,8 @@ class Result():
@@ -68,51 +74,8 @@ class Result():
|
|
|
|
|
color = tuple(color) |
|
|
|
|
|
|
|
|
|
draw.rectangle((detection['startX'], detection['startY'], detection['endX'], detection['endY']), outline=color, width=width) |
|
|
|
|
# cv2.rectangle(rect_img, (0, 0), |
|
|
|
|
# (sub_img.shape[1]-int(width/2), sub_img.shape[0]-int(width/2)), |
|
|
|
|
# color, width) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def draw_detections_cv2(self): |
|
|
|
|
color = draw_colors[self.algorithm] |
|
|
|
|
for detection in self.detections: |
|
|
|
|
self.draw_detection(detection, color) |
|
|
|
|
|
|
|
|
|
def draw_detection_cv2(self, detection, color=(0,0,255)): |
|
|
|
|
|
|
|
|
|
# First we crop the sub-rect from the image |
|
|
|
|
sub_img = self.visualisation[detection['startY']:detection['endY'], detection['startX']:detection['endX']] |
|
|
|
|
rect_img = sub_img.copy() |
|
|
|
|
width = 2 |
|
|
|
|
cv2.rectangle(rect_img, (0, 0), |
|
|
|
|
(sub_img.shape[1]-int(width/2), sub_img.shape[0]-int(width/2)), |
|
|
|
|
color, width) |
|
|
|
|
# white_rect = np.ones(sub_img.shape, dtype=np.uint8) * 255 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# filter out weak detections by ensuring the `confidence` is |
|
|
|
|
# greater than the minimum confidence |
|
|
|
|
if detection['confidence'] > self.confidence_threshold: |
|
|
|
|
# draw the bounding box of the face along with the associated |
|
|
|
|
# probability |
|
|
|
|
text = "{:.2f}%".format(detection['confidence'] * 100) |
|
|
|
|
y = detection['startY'] - 10 if detection['startY'] - 10 > 10 else detection['startY'] + 10 |
|
|
|
|
# cv2.rectangle(image, (startX, startY), (endX, endY), |
|
|
|
|
# color, 2) |
|
|
|
|
cv2.putText(self.visualisation, text, (detection['startX'], y), |
|
|
|
|
cv2.FONT_HERSHEY_SIMPLEX, 0.45, color, 2, lineType = cv2.LINE_AA) |
|
|
|
|
|
|
|
|
|
alpha = 1 |
|
|
|
|
else: |
|
|
|
|
# At least 10% opacity |
|
|
|
|
alpha = max(.3, detection['confidence']) |
|
|
|
|
|
|
|
|
|
res = cv2.addWeighted(sub_img, 1-alpha, rect_img, alpha, 1.0) |
|
|
|
|
|
|
|
|
|
# Putting the image back to its position |
|
|
|
|
self.visualisation[detection['startY']:detection['endY'], detection['startX']:detection['endX']] = res |
|
|
|
|
|
|
|
|
|
def resize(self, width, height): |
|
|
|
|
# TODO resize to new target incl all detections |
|
|
|
|
img = self.visualisation |
|
|
|
@ -437,16 +400,22 @@ def process3_haar(in_q, out_q, cascade_file):
@@ -437,16 +400,22 @@ def process3_haar(in_q, out_q, cascade_file):
|
|
|
|
|
# print(img) |
|
|
|
|
out_q.put(result) |
|
|
|
|
|
|
|
|
|
def display(image_res, q1, q2, q3, q4, fullscreen = False): |
|
|
|
|
prev_image1 = np.zeros((image_res[1],image_res[0],3), np.uint8) |
|
|
|
|
prev_image2 = np.zeros((image_res[1],image_res[0],3), np.uint8) |
|
|
|
|
prev_image3 = np.zeros((image_res[1],image_res[0],3), np.uint8) |
|
|
|
|
prev_image4 = np.zeros((image_res[1],image_res[0],3), np.uint8) |
|
|
|
|
def display(image_res, q1, q2, q3, q4, fullscreen, output_dir): |
|
|
|
|
logger = logging.getLogger('display') |
|
|
|
|
|
|
|
|
|
empty_image = np.zeros((image_res[1],image_res[0],3), np.uint8) |
|
|
|
|
prev_image1 = None |
|
|
|
|
prev_result2 = None |
|
|
|
|
prev_result3 = None |
|
|
|
|
prev_result4 = None |
|
|
|
|
|
|
|
|
|
if fullscreen: |
|
|
|
|
cv2.namedWindow("output", cv2.WND_PROP_FULLSCREEN) |
|
|
|
|
cv2.setWindowProperty("output",cv2.WND_PROP_FULLSCREEN,cv2.WINDOW_FULLSCREEN) |
|
|
|
|
|
|
|
|
|
override_image = None |
|
|
|
|
override_until = None |
|
|
|
|
|
|
|
|
|
while True: |
|
|
|
|
logging.debug('r') |
|
|
|
|
try: |
|
|
|
@ -454,53 +423,79 @@ def display(image_res, q1, q2, q3, q4, fullscreen = False):
@@ -454,53 +423,79 @@ def display(image_res, q1, q2, q3, q4, fullscreen = False):
|
|
|
|
|
image1 = cv2.resize(image1, (image_res[0], image_res[1])) |
|
|
|
|
prev_image1 = image1 |
|
|
|
|
except Empty as e: |
|
|
|
|
image1 = prev_image1 |
|
|
|
|
image1 = prev_image1 if prev_image1 is not None else empty_image |
|
|
|
|
try: |
|
|
|
|
result2 = q2.get_nowait() |
|
|
|
|
result2 = result2.resize(image_res[0], image_res[1]) |
|
|
|
|
result2.draw_detections() |
|
|
|
|
image2 = result2.visualisation |
|
|
|
|
# image2 = cv2.resize(image2, (image_res[0], image_res[1])) |
|
|
|
|
prev_image2 = image2 |
|
|
|
|
prev_result2 = result2 |
|
|
|
|
except Empty as e: |
|
|
|
|
image2 = prev_image2 |
|
|
|
|
result2 = prev_result2 |
|
|
|
|
finally: |
|
|
|
|
image2 = result2.draw_detections() if result2 is not None else empty_image |
|
|
|
|
try: |
|
|
|
|
result3 = q3.get_nowait() |
|
|
|
|
result3 = result3.resize(image_res[0], image_res[1]) |
|
|
|
|
result3.draw_detections() |
|
|
|
|
image3 = result3.visualisation |
|
|
|
|
# image3 = cv2.resize(image3, (image_res[0], image_res[1])) |
|
|
|
|
prev_image3 = image3 |
|
|
|
|
prev_result3 = result3 |
|
|
|
|
except Empty as e: |
|
|
|
|
image3 = prev_image3 |
|
|
|
|
result3 = prev_result3 |
|
|
|
|
finally: |
|
|
|
|
image3 = result3.draw_detections() if result3 is not None else empty_image |
|
|
|
|
try: |
|
|
|
|
result4 = q4.get_nowait() |
|
|
|
|
result4 = result4.resize(image_res[0], image_res[1]) |
|
|
|
|
result4.draw_detections() |
|
|
|
|
image4 = result4.visualisation |
|
|
|
|
# image4 = cv2.resize(image4, (image_res[0], image_res[1])) |
|
|
|
|
prev_image4 = image4 |
|
|
|
|
prev_result4 = result4 |
|
|
|
|
except Empty as e: |
|
|
|
|
image4 = prev_image4 |
|
|
|
|
result4 = prev_result4 |
|
|
|
|
finally: |
|
|
|
|
image4 = result4.draw_detections() if result4 is not None else empty_image |
|
|
|
|
|
|
|
|
|
if override_image is not None and override_until > time.time(): |
|
|
|
|
cv2.imshow("output", override_image) |
|
|
|
|
else: |
|
|
|
|
override_image = None |
|
|
|
|
|
|
|
|
|
img_concate_Verti1 = np.concatenate((image1,image2),axis=0) |
|
|
|
|
img_concate_Verti2 = np.concatenate((image3,image4),axis=0) |
|
|
|
|
grid_img = np.concatenate((img_concate_Verti1,img_concate_Verti2),axis=1) |
|
|
|
|
cv2.imshow("output", grid_img) |
|
|
|
|
img_concate_Verti1 = np.concatenate((image1,image2),axis=0) |
|
|
|
|
img_concate_Verti2 = np.concatenate((image3,image4),axis=0) |
|
|
|
|
grid_img = np.concatenate((img_concate_Verti1,img_concate_Verti2),axis=1) |
|
|
|
|
cv2.imshow("output", grid_img) |
|
|
|
|
|
|
|
|
|
# Hit 'q' on the keyboard to quit! |
|
|
|
|
key = cv2.waitKey(1) & 0xFF |
|
|
|
|
if key == ord('q'): |
|
|
|
|
break |
|
|
|
|
if key == ord(' '): |
|
|
|
|
# TODO save frame |
|
|
|
|
pass |
|
|
|
|
|
|
|
|
|
def main(camera_id, rotate, fullscreen, cascade_file): |
|
|
|
|
# TODO wait for frame to be processed. Eg. if I move and make a pic, it should use the last frame... |
|
|
|
|
output_res = (image_res[0] *2, image_res[1] * 2) |
|
|
|
|
pil_im = Image.fromarray(cv2.cvtColor(image1, cv2.COLOR_BGR2RGB)) |
|
|
|
|
pil_im = pil_im.resize(output_res) |
|
|
|
|
draw = ImageDraw.Draw(pil_im, 'RGBA') |
|
|
|
|
|
|
|
|
|
if result2 is not None: |
|
|
|
|
result2.resize(output_res[0], output_res[1]).draw_detections_on(draw) |
|
|
|
|
if result3 is not None: |
|
|
|
|
result3.resize(output_res[0], output_res[1]).draw_detections_on(draw) |
|
|
|
|
if result4 is not None: |
|
|
|
|
result4.resize(output_res[0], output_res[1]).draw_detections_on(draw) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
override_image = cv2.cvtColor(np.array(pil_im), cv2.COLOR_RGB2BGR) |
|
|
|
|
override_until = time.time() + 5 |
|
|
|
|
logger.info("Show frame until %f", override_until) |
|
|
|
|
|
|
|
|
|
# save images: |
|
|
|
|
name = datetime.datetime.now().isoformat(timespec='seconds') |
|
|
|
|
cv2.imwrite(os.path.join(output_dir, f'{name}.png'),override_image) |
|
|
|
|
cv2.imwrite(os.path.join(output_dir, f'{name}-hog.png'),result2.visualisation) |
|
|
|
|
cv2.imwrite(os.path.join(output_dir, f'{name}-dnn.png'),result3.visualisation) |
|
|
|
|
cv2.imwrite(os.path.join(output_dir, f'{name}-haar.png'),result4.visualisation) |
|
|
|
|
|
|
|
|
|
def main(camera_id, rotate, fullscreen, cascade_file, output_dir): |
|
|
|
|
image_size = (int(1920/2), int(1080/2)) |
|
|
|
|
|
|
|
|
|
if not os.path.exists(cascade_file): |
|
|
|
|
raise RuntimeError(f"Cannot load OpenCV haar-cascade file '{cascade_file}'") |
|
|
|
|
if not os.path.isdir(output_dir): |
|
|
|
|
raise RuntimeError(f"Non-existent directory to store files '{output_dir}'") |
|
|
|
|
|
|
|
|
|
is_rotated_90 = rotate in [cv2.ROTATE_90_CLOCKWISE, cv2.ROTATE_90_COUNTERCLOCKWISE] |
|
|
|
|
|
|
|
|
@ -519,7 +514,7 @@ def main(camera_id, rotate, fullscreen, cascade_file):
@@ -519,7 +514,7 @@ def main(camera_id, rotate, fullscreen, cascade_file):
|
|
|
|
|
q_process3 = Queue(maxsize=1) |
|
|
|
|
|
|
|
|
|
p1 = Process(target=record, args=(camera_id, q_webcam1, q_webcam2,q_webcam3,q_webcam4, image_size, rotate)) |
|
|
|
|
p2 = Process(target=display, args=(image_size, q_webcam1, q_process1, q_process2, q_process3, fullscreen )) |
|
|
|
|
p2 = Process(target=display, args=(image_size, q_webcam1, q_process1, q_process2, q_process3, fullscreen, output_dir )) |
|
|
|
|
p3 = Process(target=process1_hog, args=(q_webcam2, q_process1,)) |
|
|
|
|
p4 = Process(target=process2_dnn, args=(q_webcam3, q_process2,)) |
|
|
|
|
p5 = Process(target=process3_haar, args=(q_webcam4, q_process3,cascade_file)) |
|
|
|
|