face_recognition/face_recognition/comparison.py

561 lines
20 KiB
Python

from multiprocessing import Process, Queue
from queue import Empty, Full
import cv2
import logging
import argparse
import numpy as np
import time
import datetime
from PIL import ImageFont, ImageDraw, Image
import os
draw_colors = {
'hog': (198,65,124),
'haar': (255,255,255),
'dnn': (251,212,36),
}
titles = {
'hog' : "Histogram of oriented gradients",
'haar' : "Haar cascades",
'dnn' : "Neural network",
}
fontfile = "SourceSansPro-Regular.ttf"
font = ImageFont.truetype(fontfile, 30)
font_s = ImageFont.truetype(fontfile, 20)
class Result():
def __init__(self, algorithm, image, confidence_threshold = 0.5):
self.algorithm = algorithm
self.visualisation = image
self.detections = []
self.confidence_threshold = confidence_threshold
def add_detection(self, startX, startY, endX, endY, confidence):
self.detections.append({
'startX': startX,
'startY': startY,
'endX': endX,
'endY': endY,
'confidence': confidence
})
return self
def draw_detections(self, include_title = False):
cv2_im_rgb = cv2.cvtColor(self.visualisation,cv2.COLOR_BGR2RGB)
# Pass the image to PIL
pil_im = Image.fromarray(cv2_im_rgb)
draw = ImageDraw.Draw(pil_im, 'RGBA')
self.draw_detections_on(draw)
if include_title:
draw.text((10,10), titles[self.algorithm], fill=draw_colors[self.algorithm], font=font)
return cv2.cvtColor(np.array(pil_im), cv2.COLOR_RGB2BGR)
def draw_detections_on(self, draw: ImageDraw):
'''
Draw on a specified canvas
'''
color = draw_colors[self.algorithm]
for detection in self.detections:
self.draw_detection(draw, detection, color)
def draw_detection(self, draw: ImageDraw, detection: dict, color: tuple):
width = 2
if detection['confidence'] > self.confidence_threshold:
# draw the bounding box of the face along with the associated
# probability
text = "{:.2f}%".format(detection['confidence'] * 100)
y = detection['startY'] - 40 if detection['startY'] - 40 > 10 else detection['startY'] + 10
draw.text((detection['startX'], y), text, font=font, fill=color)
# cv2.putText(self.visualisation, text, (detection['startX'], y),
# cv2.FONT_HERSHEY_SIMPLEX, 0.45, color, 2, lineType = cv2.LINE_AA)
alpha = 1
else:
# At least 10% opacity
alpha = max(.3, detection['confidence'])
color = list(color)
color.append(int(alpha*255))
color = tuple(color)
draw.rectangle((detection['startX'], detection['startY'], detection['endX'], detection['endY']), outline=color, width=width)
def resize(self, width, height):
# TODO resize to new target incl all detections
img = self.visualisation
factor_x = width / self.visualisation.shape[1]
factor_y = height / self.visualisation.shape[0]
inter = cv2.INTER_NEAREST if self.algorithm in ['dnn', 'haar'] else cv2.INTER_CUBIC
img = cv2.resize(img, (width, height), interpolation=inter)
result = Result(self.algorithm, img, self.confidence_threshold)
for d in self.detections:
result.add_detection(
int(d['startX'] * factor_x),
int(d['startY'] * factor_y),
int(d['endX'] * factor_x),
int(d['endY'] * factor_y),
d['confidence']
)
return result
def count_detections(self):
detections = [d for d in self.detections if d['confidence'] > self.confidence_threshold]
return len(detections)
def record(device_id, q1,q2, q3, q4, resolution, rotate):
capture = cv2.VideoCapture(device_id)
is_rotated_90 = rotate in [cv2.ROTATE_90_CLOCKWISE, cv2.ROTATE_90_COUNTERCLOCKWISE]
capture.set(cv2.CAP_PROP_FRAME_WIDTH, resolution[1] if is_rotated_90 else resolution[0])
capture.set(cv2.CAP_PROP_FRAME_HEIGHT, resolution[0] if is_rotated_90 else resolution[1])
while True:
ret, image = capture.read()
if image is None:
logging.critical("Error with camera?")
exit()
if rotate is not None:
image = cv2.rotate(image, rotate)
# print(image.shape[:2], image.shape[1::-1])
if image.shape[1::-1] != resolution:
logging.warning(f"Camera resultion seems wrong: {image.shape[:2]} instead of {resolution}")
try:
q1.put_nowait(image)
except Full as e:
# ignore if processing doesn't keep up
pass
try:
q2.put_nowait(image)
except Full as e:
# ignore if processing doesn't keep up
pass
try:
q3.put_nowait(image)
except Full as e:
# ignore if processing doesn't keep up
pass
try:
q4.put_nowait(image)
except Full as e:
# ignore if processing doesn't keep up
pass
def draw_detection(image, startX, startY, endX, endY, confidence, color=(0,0,255), confidence_threshold = .5):
# First we crop the sub-rect from the image
sub_img = image[startY:endY, startX:endX]
rect_img = sub_img.copy()
width = 2
cv2.rectangle(rect_img, (0, 0),
(sub_img.shape[1]-int(width/2), sub_img.shape[0]-int(width/2)),
color, width)
# white_rect = np.ones(sub_img.shape, dtype=np.uint8) * 255
# filter out weak detections by ensuring the `confidence` is
# greater than the minimum confidence
if confidence > confidence_threshold:
# draw the bounding box of the face along with the associated
# probability
text = "{:.2f}%".format(confidence * 100)
y = startY - 10 if startY - 10 > 10 else startY + 10
# cv2.rectangle(image, (startX, startY), (endX, endY),
# color, 2)
cv2.putText(image, text, (startX, y),
cv2.FONT_HERSHEY_SIMPLEX, 0.45, color, 2)
alpha = 1
else:
# At least 10% opacity
alpha = max(.3, confidence)
res = cv2.addWeighted(sub_img, 1-alpha, rect_img, alpha, 1.0)
# Putting the image back to its position
image[startY:endY, startX:endX] = res
def process1_hog(in_q, out_q):
from skimage.feature import hog as hog_orig
from .hog import hog # use modified version for viz
from skimage import data, exposure
import matplotlib.pyplot as plt
import dlib
import matplotlib.pyplot as plt
# Get the color map by name:
cm = plt.get_cmap('plasma')
face_detector = dlib.get_frontal_face_detector()
visualisation_factor = 1
detection_factor = .4
process_this_frame = True
while True:
if process_this_frame:
# Grab a single frame of video
frame = in_q.get()
frame = cv2.cvtColor(src=frame, code=cv2.COLOR_BGR2GRAY)
viz_frame = cv2.resize(frame, (0, 0), fx=visualisation_factor, fy=visualisation_factor)
det_frame = cv2.resize(frame, (0, 0), fx=detection_factor, fy=detection_factor)
start = time.time()
fd, hog_image = hog(det_frame, orientations=6, pixels_per_cell=(8, 8),
cells_per_block=(1, 1), visualize=True, multichannel=False, visualize_factor=visualisation_factor/detection_factor)
logging.debug(f"Duration of hog viz: {time.time() - start}")
hog_image_rescaled = exposure.rescale_intensity(hog_image, in_range=(0, 10))
# hog_image_rescaled = viz_frame
# Resize frame of video to 1/4 size for faster face recognition processing
# Convert the image from BGR color (which OpenCV uses) to RGB color (which face_recognition uses)
# rgb_small_frame = det_frame[:, :, ::-1]
# dets, scores, idxs = face_detector.run(rgb_small_frame, 1, -2)
dets, scores, idxs = face_detector.run(det_frame, 1, -2)
# print(dets, scores, idxs)
hog_image_rescaled = (hog_image_rescaled.astype('float32') * 255).astype('uint8')
# hog_image_rescaled = cv2.cvtColor(hog_image_rescaled, cv2.COLOR_GRAY2BGR)
# blue background:
# hog_image_rescaled[:,:,0] = 200
# Apply the colormap like a function to any array:
colored_image = (cm(hog_image_rescaled) * 255).astype('uint8')
colored_image = cv2.cvtColor(colored_image, cv2.COLOR_RGB2BGR)
# result = Result('hog', hog_image_rescaled, 0)
result = Result('hog', colored_image, 0)
# Display the results
for i, rectangle in enumerate(dets):
probability = scores[i]
# print(rectangle)
# Scale back up face locations since the frame we detected in was scaled to 1/4 size
top = int(rectangle.top() * (visualisation_factor / detection_factor))
right = int(rectangle.right() * (visualisation_factor / detection_factor))
bottom = int(rectangle.bottom() * (visualisation_factor / detection_factor))
left = int(rectangle.left() * (visualisation_factor / detection_factor))
result.add_detection(left, top, right, bottom,probability)
# draw_detection(hog_image_rescaled, left, top, right, bottom, probability, draw_colors['hog'], 0)
# brightness = int(min(255, (probability + 1)*255))
# # Draw a box around the face
# cv2.rectangle(hog_image_rescaled, (left, top), (right, bottom), (0,0,brightness), 2)
# # Draw a label with a name below the face
# cv2.rectangle(frame, (left, bottom - 35), (right, bottom), (0, 0, 255), cv2.FILLED)
# Display the resulting image
out_q.put(result)
# print(cgray.shape)
process_this_frame = not process_this_frame
def process2_dnn(in_q, out_q):
logger = logging.getLogger('dnn')
prototxt = "dnn/face_detector/opencv_face_detector.pbtxt"
prototxt = "dnn/face_detector/deploy.prototxt"
model = "dnn/face_detector/res10_300x300_ssd_iter_140000_fp16.caffemodel"
confidence_threshold = 0.5
logger.info("[INFO] loding model...")
net = cv2.dnn.readNetFromCaffe(prototxt, model)
logger.info("Loaded")
while True:
image = in_q.get()
(h, w) = image.shape[:2]
image_small = cv2.resize(image, (300, 300))
(hs, ws) = image_small.shape[:2]
blob = cv2.dnn.blobFromImage(image_small, 1.0,
(300, 300), (104.0, 177.0, 123.0))
image = cv2.cvtColor(cv2.cvtColor(image_small, cv2.COLOR_BGR2GRAY), cv2.COLOR_GRAY2BGR)
net.setInput(blob)
detections = net.forward()
# idxs = np.argsort(detections[0])[::-1][:5]
result = Result('dnn', image)
for i in range(0, detections.shape[2]):
# extract the confidence (i.e., probability) associated with the
# prediction
confidence = detections[0, 0, i, 2]
# compute the (x, y)-coordinates of the bounding box for the
# object
# box = detections[0, 0, i, 3:7] * np.array([w, h, w, h])
box = detections[0, 0, i, 3:7] * np.array([ws, hs, ws, hs])
(startX, startY, endX, endY) = box.astype("int")
result.add_detection(startX, startY, endX, endY, confidence)
# draw_detection(image, startX, startY, endX, endY, confidence, draw_colors['dnn'])
out_q.put(result)
def process3_haar(in_q, out_q, cascade_file):
from cffi import FFI
from PIL import Image
import cv2
import os
logger = logging.getLogger('haar')
ffi = FFI()
ffi.cdef("""
int test(int);
typedef void* haarclassifier;
haarclassifier classifier_new(char *filename);
void scan_image(haarclassifier, size_t width,size_t height, char *input, char *buffer, size_t length, bool debug);
""")
dir_path = os.path.dirname(os.path.realpath(__file__))
lib_path = os.path.join(dir_path, "..", "visualhaar", "target", "debug")
so_path = os.path.join(lib_path, "libvisual_haarcascades_lib.so")
dll_path = os.path.join(lib_path, "visual_haarcascades_lib.dll")
if os.path.exists(so_path):
C = ffi.dlopen(so_path)
elif os.path.exists(dll_path):
C = ffi.dlopen(dll_path)
else:
raise RuntimeException("Visual haarcascades library is not found")
# print(C.test(9))
# i = Image.open("Marjo.jpg")
# width = i.size[0]
# height = i.size[0]
# use the rust lib to draw the visualisation
filename = cascade_file.encode('ascii')
fn = ffi.new("char[]", filename)
haar = C.classifier_new(fn)
logger.info("Initialised haar classifier")
# opencv for the actual detections
faceCascade = cv2.CascadeClassifier(cascade_file)
while True:
frame = in_q.get()
(height_orig, width_orig) = frame.shape[:2]
scale_factor = 3
width = int(width_orig/scale_factor)
height = int(height_orig/scale_factor)
frame = cv2.resize(frame, (width, height))
# Run the B&W version through opencv haar to detect faces
# for some reason the variable 'frame' is modified after
# running the visualisation, so we do this before
f = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
faces = faceCascade.detectMultiScale(f)
pixel_format = "RGB" #The raytracer only supports one format
bytes_per_pixel = 3
buffer_len = width * height * bytes_per_pixel
buffer = ffi.new("char[]", buffer_len)
buffer2 = ffi.from_buffer("char[]", frame.tobytes())
# i = Image.open("/home/ruben/Documents/Projecten/(2020/rust/lena_orig.png")
# data = i.tobytes("raw", "RGB")
logger.info("Start haar scan")
start = time.time()
C.scan_image(haar, width, height, buffer2, buffer, buffer_len, False)
logger.info(f"Visualised scan into buffer: {buffer}")
print(f"duration: {time.time() - start}s")
img = Image.frombuffer(pixel_format, (width, height), ffi.buffer(buffer),
"raw", pixel_format, 0, 1)
img= np.array(img)
# a= np.frombuffer(ffi.buffer(buffer))
# a.reshape((height, width, bytes_per_pixel))
# flip RGB back to BGR
# img = img[:, :, ::-1]
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
img = cv2.resize(img, (width_orig, height_orig))
result = Result('haar', img)
for face in faces:
x1, y1, w, h = face
x2 = x1 + w
y2 = y1 + h
# print(img.shape)
# TODO: is scale factor ok here?
# draw_detection(img, x1 * scale_factor, y1 * scale_factor, x2 * scale_factor, y2 * scale_factor, 1, draw_colors['haar'],)
result.add_detection(x1 * scale_factor, y1 * scale_factor, x2 * scale_factor, y2 * scale_factor, 1)
# print(img)
out_q.put(result)
def draw_stats(image, results):
pil_im = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
draw = ImageDraw.Draw(pil_im, 'RGBA')
for i, result in enumerate(results):
if result is None:
continue
c = result.count_detections()
txt = "face" if c == 1 else "faces"
txt = f"{result.algorithm.ljust(5)} {c} {txt}"
draw.text((10, pil_im.size[1] - i*25 - 50), txt, fill=draw_colors[result.algorithm], font=font_s, stroke_width=1, stroke_fill=(0,0,0))
return cv2.cvtColor(np.array(pil_im), cv2.COLOR_RGB2BGR)
def display(image_res, q1, q2, q3, q4, fullscreen, output_dir):
logger = logging.getLogger('display')
empty_image = np.zeros((image_res[1],image_res[0],3), np.uint8)
results = [None, None, None]
result_queues = [q2, q3, q4]
images = [empty_image, empty_image, empty_image, empty_image]
override_image = None
override_until = None
if fullscreen:
cv2.namedWindow("output", cv2.WND_PROP_FULLSCREEN)
cv2.setWindowProperty("output",cv2.WND_PROP_FULLSCREEN,cv2.WINDOW_FULLSCREEN)
while True:
logging.debug('r')
try:
image = q1.get_nowait()
images[0] = cv2.resize(image, (image_res[0], image_res[1]))
except Empty as e:
pass
for idx, queue in enumerate(result_queues):
try:
result = queue.get_nowait()
results[idx] = result.resize(image_res[0], image_res[1])
images[idx+1] = results[idx].draw_detections(include_title=True)
except Empty as e:
pass
finally:
pass
if override_image is not None and override_until > time.time():
cv2.imshow("output", override_image)
else:
override_image = None
images[0] = draw_stats(images[0], results)
img_concate_Verti1 = np.concatenate((images[0],images[1]),axis=0)
img_concate_Verti2 = np.concatenate((images[2],images[3]),axis=0)
grid_img = np.concatenate((img_concate_Verti1,img_concate_Verti2),axis=1)
cv2.imshow("output", grid_img)
# Hit 'q' on the keyboard to quit!
key = cv2.waitKey(1) & 0xFF
if key == ord('q'):
break
if key == ord(' '):
# TODO wait for frame to be processed. Eg. if I move and make a pic, it should use the last frame...
output_res = (image_res[0] *2, image_res[1] * 2)
pil_im = Image.fromarray(cv2.cvtColor(images[0], cv2.COLOR_BGR2RGB))
pil_im = pil_im.resize(output_res)
draw = ImageDraw.Draw(pil_im, 'RGBA')
for result in results:
if result is None:
continue
result.resize(output_res[0], output_res[1]).draw_detections_on(draw)
override_image = cv2.cvtColor(np.array(pil_im), cv2.COLOR_RGB2BGR)
override_until = time.time() + 5
logger.info("Show frame until %f", override_until)
# save images:
name = datetime.datetime.now().isoformat(timespec='seconds')
cv2.imwrite(os.path.join(output_dir, f'{name}.png'),override_image)
for result in results:
cv2.imwrite(os.path.join(output_dir, f'{name}-{result.algorithm}.png'),result.visualisation)
def main(camera_id, rotate, fullscreen, cascade_file, output_dir):
image_size = (int(1920/2), int(1080/2))
if not os.path.exists(cascade_file):
raise RuntimeError(f"Cannot load OpenCV haar-cascade file '{cascade_file}'")
if not os.path.isdir(output_dir):
raise RuntimeError(f"Non-existent directory to store files '{output_dir}'")
is_rotated_90 = rotate in [cv2.ROTATE_90_CLOCKWISE, cv2.ROTATE_90_COUNTERCLOCKWISE]
if is_rotated_90:
image_size = (image_size[1], image_size[0])
# TODO should we use queues here at all?
# https://docs.python.org/3/library/multiprocessing.html#programming-guidelines
# TODO: queue maxsize, or prefrabily some sort of throttled queue (like zmq hight water mark)
q_webcam1 = Queue(maxsize=1)
q_webcam2 = Queue(maxsize=1)
q_webcam3 = Queue(maxsize=1)
q_webcam4 = Queue(maxsize=1)
q_process1 = Queue(maxsize=1)
q_process2 = Queue(maxsize=1)
q_process3 = Queue(maxsize=1)
p1 = Process(target=record, args=(camera_id, q_webcam1, q_webcam2,q_webcam3,q_webcam4, image_size, rotate))
p2 = Process(target=display, args=(image_size, q_webcam1, q_process1, q_process2, q_process3, fullscreen, output_dir ))
p3 = Process(target=process1_hog, args=(q_webcam2, q_process1,))
p4 = Process(target=process2_dnn, args=(q_webcam3, q_process2,))
p5 = Process(target=process3_haar, args=(q_webcam4, q_process3,cascade_file))
p1.start()
p2.start()
p3.start()
p4.start()
p5.start()
p2.join() # process with the display interface
p1.kill()
p3.kill()
p4.kill()
p5.kill()