Live visualisation of various facial recognition algorithms.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

693 lines
26 KiB

from multiprocessing import Process, Queue
from queue import Empty, Full
import cv2
import logging
import argparse
import numpy as np
import time
import math
import datetime
from PIL import ImageFont, ImageDraw, Image
import os
import sys
draw_colors = {
'hog': (198,65,124),
'haar': (255,255,255),
'dnn': (251,212,36),
}
titles = {
'hog' : "Histogram of oriented gradients",
'haar' : "Haar cascades",
'dnn' : "Neural network",
}
project_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)),'..')
fontfile = os.path.join(project_dir, "SourceSansPro-Regular.ttf")
font = ImageFont.truetype(fontfile, 30)
font_s = ImageFont.truetype(fontfile, 20)
countdown_font = ImageFont.truetype(fontfile, 160)
class Result():
def __init__(self, algorithm, image, confidence_threshold = 0.5):
self.algorithm = algorithm
self.visualisation = image
self.detections = []
self.confidence_threshold = confidence_threshold
def add_detection(self, startX, startY, endX, endY, confidence):
self.detections.append({
'startX': startX,
'startY': startY,
'endX': endX,
'endY': endY,
'confidence': confidence
})
return self
def draw_detections(self, include_title = False, coloured=False):
cv2_im_rgb = cv2.cvtColor(self.visualisation,cv2.COLOR_BGR2RGB)
# Pass the image to PIL
pil_im = Image.fromarray(cv2_im_rgb)
draw = ImageDraw.Draw(pil_im, 'RGBA')
self.draw_detections_on(draw, coloured)
if include_title:
color = draw_colors[self.algorithm] if coloured else (255,255,255)
draw.text((10,10), titles[self.algorithm], fill=color, font=font, stroke_width=1, stroke_fill=(0,0,0,100))
return cv2.cvtColor(np.array(pil_im), cv2.COLOR_RGB2BGR)
def draw_detections_on(self, draw: ImageDraw, coloured=False, onlyIfConfident=False):
'''
Draw on a specified canvas
'''
color = draw_colors[self.algorithm] if coloured else (255,255,255)
for detection in self.detections:
self.draw_detection(draw, detection, color, onlyIfConfident)
def draw_detection(self, draw: ImageDraw, detection: dict, color: tuple, onlyIfConfident: bool = False):
if detection['confidence'] > self.confidence_threshold:
width = 8
# draw the bounding box of the face along with the associated
# probability
text = "{:.0f}%".format(detection['confidence'] * 100)
y = detection['startY'] - 40 if detection['startY'] - 40 > 10 else detection['startY'] + 10
draw.text((detection['startX'], y), text, font=font, fill=color, stroke_fill=(0,0,0,100), stroke_width=1)
# cv2.putText(self.visualisation, text, (detection['startX'], y),
# cv2.FONT_HERSHEY_SIMPLEX, 0.45, color, 2, lineType = cv2.LINE_AA)
alpha = 1
draw.rectangle((detection['startX']-1, detection['startY']-1, detection['endX']+1, detection['endY']+1), outline=(0,0,0,100), width=1)
draw.rectangle((detection['startX']+width, detection['startY']+width, detection['endX']-width, detection['endY']-width), outline=(0,0,0,100), width=1)
elif onlyIfConfident:
# Only draw if above threshold, so this should be ignored.
return
else:
width = int(detection['confidence'] * 10 * 8)
# At least 10% opacity
alpha = max(.2, detection['confidence'])
color = list(color)
color.append(int(alpha*255))
color = tuple(color)
draw.rectangle((detection['startX'], detection['startY'], detection['endX'], detection['endY']), outline=color, width=width)
def resize(self, width, height, flip=False):
# TODO resize to new target incl all detections
img = self.visualisation
factor_x = width / self.visualisation.shape[1]
factor_y = height / self.visualisation.shape[0]
inter = cv2.INTER_NEAREST if self.algorithm in ['dnn', 'haar'] else cv2.INTER_CUBIC
img = cv2.resize(img, (width, height), interpolation=inter)
if flip:
img = cv2.flip(img, 1)
result = Result(self.algorithm, img, self.confidence_threshold)
for d in self.detections:
if flip:
result.add_detection(
int(width - d['endX'] * factor_x),
int(d['startY'] * factor_y),
int(width - d['startX'] * factor_x),
int(d['endY'] * factor_y),
d['confidence']
)
else:
result.add_detection(
int(d['startX'] * factor_x),
int(d['startY'] * factor_y),
int(d['endX'] * factor_x),
int(d['endY'] * factor_y),
d['confidence']
)
return result
def count_detections(self):
detections = [d for d in self.detections if d['confidence'] > self.confidence_threshold]
return len(detections)
def record(device_id, q1,q2, q3, q4, resolution, rotate):
capture = cv2.VideoCapture(device_id)
is_rotated_90 = rotate in [cv2.ROTATE_90_CLOCKWISE, cv2.ROTATE_90_COUNTERCLOCKWISE]
capture.set(cv2.CAP_PROP_FRAME_WIDTH, resolution[1] if is_rotated_90 else resolution[0])
capture.set(cv2.CAP_PROP_FRAME_HEIGHT, resolution[0] if is_rotated_90 else resolution[1])
gave_camera_warning = False
while True:
ret, image = capture.read()
if image is None:
logging.critical("Error with camera?")
sys.exit()
if rotate is not None:
image = cv2.rotate(image, rotate)
# Flip image to create the 'mirror' effect.
image = cv2.flip(image, 1)
# print(image.shape[:2], image.shape[1::-1])
if image.shape[1::-1] != resolution and not gave_camera_warning:
logging.warning(f"Camera resultion seems wrong: {image.shape[:2]} instead of {resolution}")
gave_camera_warning = True
try:
q1.put_nowait(image)
except Full as e:
# ignore if processing doesn't keep up
pass
try:
q2.put_nowait(image)
except Full as e:
# ignore if processing doesn't keep up
pass
try:
q3.put_nowait(image)
except Full as e:
# ignore if processing doesn't keep up
pass
try:
q4.put_nowait(image)
except Full as e:
# ignore if processing doesn't keep up
pass
def draw_detection(image, startX, startY, endX, endY, confidence, color=(0,0,255), confidence_threshold = .5):
# First we crop the sub-rect from the image
sub_img = image[startY:endY, startX:endX]
rect_img = sub_img.copy()
width = 2
cv2.rectangle(rect_img, (0, 0),
(sub_img.shape[1]-int(width/2), sub_img.shape[0]-int(width/2)),
color, width)
# white_rect = np.ones(sub_img.shape, dtype=np.uint8) * 255
# filter out weak detections by ensuring the `confidence` is
# greater than the minimum confidence
if confidence > confidence_threshold:
# draw the bounding box of the face along with the associated
# probability
text = "{:.2f}%".format(confidence * 100)
y = startY - 10 if startY - 10 > 10 else startY + 10
# cv2.rectangle(image, (startX, startY), (endX, endY),
# color, 2)
cv2.putText(image, text, (startX, y),
cv2.FONT_HERSHEY_SIMPLEX, 0.45, color, 2)
alpha = 1
else:
# At least 10% opacity
alpha = max(.3, confidence)
res = cv2.addWeighted(sub_img, 1-alpha, rect_img, alpha, 1.0)
# Putting the image back to its position
image[startY:endY, startX:endX] = res
def process1_hog(in_q, out_q):
# from skimage.feature import hog as hog_orig
from .hog import hog # use modified version for viz
from skimage import exposure
import matplotlib.pyplot as plt
import dlib
import matplotlib.pyplot as plt
# Get the color map by name:
cm = plt.get_cmap('plasma')
face_detector = dlib.get_frontal_face_detector()
visualisation_factor = 1
detection_factor = .3
process_this_frame = True
while True:
if process_this_frame:
# Grab a single frame of video
frame = in_q.get()
frame = cv2.cvtColor(src=frame, code=cv2.COLOR_BGR2GRAY)
# viz_frame = cv2.resize(frame, (0, 0), fx=visualisation_factor, fy=visualisation_factor)
det_frame = cv2.resize(frame, (0, 0), fx=detection_factor, fy=detection_factor)
start = time.time()
fd, hog_image = hog(det_frame, orientations=6, pixels_per_cell=(8, 8),
cells_per_block=(1, 1), visualize=True, multichannel=False, visualize_factor=visualisation_factor/detection_factor)
logging.debug(f"Duration of hog viz: {time.time() - start}")
hog_image_rescaled = exposure.rescale_intensity(hog_image, in_range=(0, 10))
# hog_image_rescaled = viz_frame
# Resize frame of video to 1/4 size for faster face recognition processing
# Convert the image from BGR color (which OpenCV uses) to RGB color (which face_recognition uses)
# rgb_small_frame = det_frame[:, :, ::-1]
# dets, scores, idxs = face_detector.run(rgb_small_frame, 1, -2)
dets, scores, idxs = face_detector.run(det_frame, 1, -2)
# print(dets, scores, idxs)
hog_image_rescaled = (hog_image_rescaled.astype('float32') * 255).astype('uint8')
# hog_image_rescaled = cv2.cvtColor(hog_image_rescaled, cv2.COLOR_GRAY2BGR)
# blue background:
# hog_image_rescaled[:,:,0] = 200
# Apply the colormap like a function to any array:
colored_image = (cm(hog_image_rescaled) * 255).astype('uint8')
colored_image = cv2.cvtColor(colored_image, cv2.COLOR_RGB2BGR)
# result = Result('hog', hog_image_rescaled, 0)
result = Result('hog', colored_image, 0)
# Display the results
for i, rectangle in enumerate(dets):
probability = scores[i]
# print(rectangle)
# Scale back up face locations since the frame we detected in was scaled to 1/4 size
top = int(rectangle.top() * (visualisation_factor / detection_factor))
right = int(rectangle.right() * (visualisation_factor / detection_factor))
bottom = int(rectangle.bottom() * (visualisation_factor / detection_factor))
left = int(rectangle.left() * (visualisation_factor / detection_factor))
result.add_detection(left, top, right, bottom,probability)
# draw_detection(hog_image_rescaled, left, top, right, bottom, probability, draw_colors['hog'], 0)
# brightness = int(min(255, (probability + 1)*255))
# # Draw a box around the face
# cv2.rectangle(hog_image_rescaled, (left, top), (right, bottom), (0,0,brightness), 2)
# # Draw a label with a name below the face
# cv2.rectangle(frame, (left, bottom - 35), (right, bottom), (0, 0, 255), cv2.FILLED)
# Display the resulting image
out_q.put(result)
# print(cgray.shape)
process_this_frame = not process_this_frame
def process2_dnn(in_q, out_q):
logger = logging.getLogger('dnn')
prototxt = "dnn/face_detector/opencv_face_detector.pbtxt"
prototxt = "dnn/face_detector/deploy.prototxt"
model = "dnn/face_detector/res10_300x300_ssd_iter_140000_fp16.caffemodel"
confidence_threshold = 0.7
logger.info("[INFO] loding model...")
net = cv2.dnn.readNetFromCaffe(prototxt, model)
logger.info("Loaded")
while True:
image = in_q.get()
(h, w) = image.shape[:2]
image_small = cv2.resize(image, (300, 300))
(hs, ws) = image_small.shape[:2]
blob = cv2.dnn.blobFromImage(image_small, 1.0,
(300, 300), (104.0, 177.0, 123.0))
image = cv2.cvtColor(cv2.cvtColor(image_small, cv2.COLOR_BGR2GRAY), cv2.COLOR_GRAY2BGR)
net.setInput(blob)
detections = net.forward()
# idxs = np.argsort(detections[0])[::-1][:5]
result = Result('dnn', image)
for i in range(0, detections.shape[2]):
# extract the confidence (i.e., probability) associated with the
# prediction
confidence = detections[0, 0, i, 2]
# compute the (x, y)-coordinates of the bounding box for the
# object
# box = detections[0, 0, i, 3:7] * np.array([w, h, w, h])
box = detections[0, 0, i, 3:7] * np.array([ws, hs, ws, hs])
(startX, startY, endX, endY) = box.astype("int")
result.add_detection(startX, startY, endX, endY, confidence)
# draw_detection(image, startX, startY, endX, endY, confidence, draw_colors['dnn'])
out_q.put(result)
def process3_haar(in_q, out_q, cascade_file, library_filename = None):
from cffi import FFI
from PIL import Image
import cv2
import os
logger = logging.getLogger('haar')
ffi = FFI()
ffi.cdef("""
int test(int);
typedef void* haarclassifier;
haarclassifier classifier_new(char *filename);
void scan_image(haarclassifier, size_t width,size_t height, char *input, char *buffer, size_t length, size_t min_face_factor, bool debug);
""")
if library_filename is not None:
C = ffi.dlopen(library_filename)
else:
lib_path = os.path.join(project_dir, "visualhaar", "target", "release")
possible_paths = [
os.path.join(lib_path, "libvisual_haarcascades_lib.so"),
os.path.join(lib_path, "visual_haarcascades_lib.dll"),
os.path.join(project_dir, "visual_haarcascades_lib.dll"),
]
existing_paths = [p for p in possible_paths if os.path.exists(p)]
if not len(existing_paths):
raise RuntimeError("Visual haarcascades library is not found")
logger.debug(f"Using library: {existing_paths[0]}")
C = ffi.dlopen(existing_paths[0])
# print(C.test(9))
# i = Image.open("Marjo.jpg")
# width = i.size[0]
# height = i.size[0]
# use the rust lib to draw the visualisation
filename = cascade_file.encode('ascii')
fn = ffi.new("char[]", filename)
haar = C.classifier_new(fn)
logger.info("Initialised haar classifier")
# opencv for the actual detections
faceCascade = cv2.CascadeClassifier(cascade_file)
while True:
frame = in_q.get()
(height_orig, width_orig) = frame.shape[:2]
scale_factor = 4
width = int(width_orig/scale_factor)
height = int(height_orig/scale_factor)
frame = cv2.resize(frame, (width, height))
# Run the B&W version through opencv haar to detect faces
# for some reason the variable 'frame' is modified after
# running the visualisation, so we do this before
f = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
faces = faceCascade.detectMultiScale(f)
pixel_format = "RGB" #The raytracer only supports one format
bytes_per_pixel = 3
buffer_len = width * height * bytes_per_pixel
buffer = ffi.new("char[]", buffer_len)
buffer2 = ffi.from_buffer("char[]", frame.tobytes())
# i = Image.open("/home/ruben/Documents/Projecten/(2020/rust/lena_orig.png")
# data = i.tobytes("raw", "RGB")
logger.info("Start haar scan")
start = time.time()
C.scan_image(haar, width, height, buffer2, buffer, buffer_len, 5, False)
logger.info(f"Visualised scan into buffer: {buffer}")
# print(f"duration: {time.time() - start}s")
img = Image.frombuffer(pixel_format, (width, height), ffi.buffer(buffer),
"raw", pixel_format, 0, 1)
img= np.array(img)
# a= np.frombuffer(ffi.buffer(buffer))
# a.reshape((height, width, bytes_per_pixel))
# flip RGB back to BGR
# img = img[:, :, ::-1]
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
img = cv2.resize(img, (width_orig, height_orig))
result = Result('haar', img)
for face in faces:
x1, y1, w, h = face
x2 = x1 + w
y2 = y1 + h
# print(img.shape)
# TODO: is scale factor ok here?
# draw_detection(img, x1 * scale_factor, y1 * scale_factor, x2 * scale_factor, y2 * scale_factor, 1, draw_colors['haar'],)
result.add_detection(x1 * scale_factor, y1 * scale_factor, x2 * scale_factor, y2 * scale_factor, 1)
# print(img)
out_q.put(result)
def draw_stats(image, results, padding, coloured=False, drawDetections=False):
pil_im = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
draw = ImageDraw.Draw(pil_im, 'RGBA')
draw_stats_on_canvas(draw, results, padding, coloured, drawDetections)
return cv2.cvtColor(np.array(pil_im), cv2.COLOR_RGB2BGR)
def draw_stats_on_canvas(draw, results, padding, coloured=False, drawDetections=False):
for i, result in enumerate(results):
if result is None:
continue
c = result.count_detections()
txt = "face" if c == 1 else "faces"
txt = f"{result.algorithm.ljust(5)} {c} {txt}"
height = padding + 25
colour = draw_colors[result.algorithm] if coloured else (255,255,255)
draw.text((padding, draw.im.size[1] - (i+1)*height - padding), txt, fill=colour, font=font, stroke_width=2, stroke_fill=(0,0,0))
if drawDetections:
result.draw_detections_on(draw, coloured, onlyIfConfident=True)
def display(image_res, q1, q2, q3, q4, fullscreen, output_dir):
logger = logging.getLogger('display')
empty_image = np.zeros((image_res[1],image_res[0],3), np.uint8)
image_ratio = image_res[0] / image_res[1]
results = [None, None, None]
result_queues = [q2, q3, q4]
images = [empty_image, empty_image, empty_image, empty_image]
override_image = None
override_until = None
countdown_until = None
# imageIdx = 0
# grid in the right corner
preview_scale = 10
preview_width = round(image_res[0] / preview_scale)
preview_height = round(preview_width / image_ratio)
padding = round(image_res[0] / 100)
if fullscreen:
cv2.namedWindow("output", cv2.WINDOW_NORMAL)
cv2.setWindowProperty("output",cv2.WND_PROP_FULLSCREEN,cv2.WINDOW_FULLSCREEN)
else:
cv2.namedWindow("output", cv2.WINDOW_AUTOSIZE)
def selectPreview(event, x, y, flags, param):
if event == cv2.EVENT_LBUTTONDOWN:
if x > image_res[0] - padding or x < image_res[0] - padding - preview_width:
return
preview_images = [idx for idx,image in enumerate(images) if idx != selectPreview.imageIdx]
for offset, image_nr in enumerate(preview_images):
offset_y = (preview_height + padding) * offset
# print(offset, y, image_res[0] - padding - preview_height - offset_y, image_res[0] - padding - offset_y)
if y > image_res[1] - padding - preview_height - offset_y and y < image_res[1] - padding - offset_y:
selectPreview.imageIdx = image_nr
print("Select image", offset, image_nr)
break
selectPreview.imageIdx = 0
cv2.setMouseCallback('output', selectPreview)
while True:
try:
image = q1.get_nowait()
images[0] = cv2.resize(image, (image_res[0], image_res[1]))
except Empty as e:
pass
for idx, queue in enumerate(result_queues):
try:
result = queue.get_nowait()
results[idx] = result.resize(image_res[0], image_res[1])
images[idx+1] = results[idx].draw_detections(include_title=True)
except Empty as e:
pass
finally:
pass
if override_image is not None and override_until > time.time():
cv2.imshow("output", override_image)
else:
override_image = None
# images[0] = draw_stats(images[0], results)
# show the selected image:
grid_img = images[selectPreview.imageIdx].copy()
# previews in the right bottom corner
preview_images = [image for idx,image in enumerate(images) if idx != selectPreview.imageIdx]
for idx, image in enumerate(preview_images):
offset_y = (preview_height + padding) * idx
grid_img[
grid_img.shape[0] - padding - preview_height - offset_y:grid_img.shape[0] - padding - offset_y,
grid_img.shape[1] - padding - preview_width:grid_img.shape[1] - padding] = cv2.resize(image, (preview_width, preview_height), cv2.INTER_CUBIC)
# statistics
# for the plain webcam image (no viz), draw all detected faces.
drawDetections = (selectPreview.imageIdx == 0)
grid_img = draw_stats(grid_img, results, padding, coloured=True, drawDetections=drawDetections)
pil_im = Image.fromarray(cv2.cvtColor(grid_img, cv2.COLOR_BGR2RGB))
draw = ImageDraw.Draw(pil_im, 'RGBA')
# Draw countdown
if countdown_until:
duration = math.ceil(countdown_until - time.time())
w, h = draw.textsize(f"{duration}", font=countdown_font)
draw.text(((grid_img.shape[1]-w)/2,(grid_img.shape[0]-h)/2), f"{duration}", fill="white", stroke="black", font=countdown_font, stroke_width=1, stroke_fill=(0,0,0,100))
grid_img = cv2.cvtColor(np.array(pil_im), cv2.COLOR_RGB2BGR)
# img_concate_Verti1 = np.concatenate((images[0],images[1]),axis=0)
# img_concate_Verti2 = np.concatenate((images[2],images[3]),axis=0)
# grid_img = np.concatenate((img_concate_Verti1,img_concate_Verti2),axis=1)
cv2.imshow("output", grid_img)
# Hit 'q' on the keyboard to quit!
key = cv2.waitKey(1) & 0xFF
if key == ord('q') or key == 27: # key 27: escape
break
# TODO: the truth value of an array with ore than one element is ambiguous, use a.any or a.all() (OF DUS override_image is None)
if key == ord(' ') and override_image is None:
countdown_until = time.time() + 3 # seconds of countdown
# SNAP! SAVE FRAMES
if countdown_until is not None and time.time() > countdown_until:
countdown_until = None
# TODO wait for frame to be processed. Eg. if I move and make a pic, it should use the last frame...
# output_res = (image_res[0] *2, image_res[1] * 2)
output_res = image_res # no scaling needed anyore
pil_im = Image.fromarray(cv2.cvtColor(cv2.flip(images[0],1), cv2.COLOR_BGR2RGB))
pil_im = pil_im.resize(output_res)
# base name for all images
name = datetime.datetime.now().isoformat(timespec='seconds').replace(':','-')
# filename of clean frame
filename = os.path.join(output_dir, f'{name}-frame.jpg')
pil_im.save(filename)
# now draw all results to the main image
draw = ImageDraw.Draw(pil_im, 'RGBA')
for result in results:
if result is None:
continue
result.resize(output_res[0], output_res[1], flip=True).draw_detections_on(draw, coloured=True)
draw_stats_on_canvas(draw, results, padding, coloured=True)
override_image = cv2.cvtColor(np.array(pil_im), cv2.COLOR_RGB2BGR)
override_until = time.time() + 5
logger.info("Show frame until %f", override_until)
# save images:
filename = os.path.join(output_dir, f'{name}-all.png')
print(f"Save to {filename}")
r=cv2.imwrite(filename, override_image)
if not r:
raise RuntimeError(f"Could not save image {filename}")
# finally, store each visualisation with the results
for result in results:
result_img =result.draw_detections(include_title = True)
filename = os.path.join(output_dir, f'{name}-{result.algorithm}.png')
r = cv2.imwrite(filename, result_img)
if not r:
raise RuntimeError(f"Could not save image {filename}")
def main(camera_id, rotate, fullscreen, cascade_file, output_dir, visualhaar_lib = None):
image_size = (1920, 1080) #(int(1920/2), int(1080/2))
if not os.path.exists(cascade_file):
raise RuntimeError(f"Cannot load OpenCV haar-cascade file '{cascade_file}'")
if not os.path.isdir(output_dir):
raise RuntimeError(f"Non-existent directory to store files '{output_dir}'")
is_rotated_90 = rotate in [cv2.ROTATE_90_CLOCKWISE, cv2.ROTATE_90_COUNTERCLOCKWISE]
if is_rotated_90:
image_size = (image_size[1], image_size[0])
# TODO should we use queues here at all?
# https://docs.python.org/3/library/multiprocessing.html#programming-guidelines
# TODO: queue maxsize, or prefrabily some sort of throttled queue (like zmq hight water mark)
q_webcam1 = Queue(maxsize=1)
q_webcam2 = Queue(maxsize=1)
q_webcam3 = Queue(maxsize=1)
q_webcam4 = Queue(maxsize=1)
q_process1 = Queue(maxsize=1)
q_process2 = Queue(maxsize=1)
q_process3 = Queue(maxsize=1)
p1 = Process(target=record, args=(camera_id, q_webcam1, q_webcam2,q_webcam3,q_webcam4, image_size, rotate))
p2 = Process(target=display, args=(image_size, q_webcam1, q_process1, q_process2, q_process3, fullscreen, output_dir ))
p3 = Process(target=process1_hog, args=(q_webcam2, q_process1,))
p4 = Process(target=process2_dnn, args=(q_webcam3, q_process2,))
p5 = Process(target=process3_haar, args=(q_webcam4, q_process3,cascade_file, visualhaar_lib))
p1.start()
p2.start()
p3.start()
p4.start()
p5.start()
p2.join() # process with the display interface
p1.kill()
p3.kill()
p4.kill()
p5.kill()