face_recognition/face_recognition/comparison.py

443 lines
16 KiB
Python

from multiprocessing import Process, Queue
from queue import Empty, Full
import cv2
import logging
import argparse
import numpy as np
import time
draw_colors = {
'dnn': (255,0,0),
'haar': (0,255,0),
'hog': (0,0,255),
}
class Result():
def __init__(self, algorithm, image, confidence_threshold = 0.5):
self.algorithm = algorithm
self.visualisation = image
self.detections = []
self.confidence_threshold = confidence_threshold
def add_detection(self, startX, startY, endX, endY, confidence):
self.detections.append({
'startX': startX,
'startY': startY,
'endX': endX,
'endY': endY,
'confidence': confidence
})
return self
def draw_detections(self):
color = draw_colors[self.algorithm]
for detection in self.detections:
self.draw_detection(detection, color)
def draw_detection(self, detection, color=(0,0,255)):
# First we crop the sub-rect from the image
sub_img = self.visualisation[detection['startY']:detection['endY'], detection['startX']:detection['endX']]
rect_img = sub_img.copy()
width = 2
cv2.rectangle(rect_img, (0, 0),
(sub_img.shape[1]-int(width/2), sub_img.shape[0]-int(width/2)),
color, width)
# white_rect = np.ones(sub_img.shape, dtype=np.uint8) * 255
# filter out weak detections by ensuring the `confidence` is
# greater than the minimum confidence
if detection['confidence'] > self.confidence_threshold:
# draw the bounding box of the face along with the associated
# probability
text = "{:.2f}%".format(detection['confidence'] * 100)
y = detection['startY'] - 10 if detection['startY'] - 10 > 10 else detection['startY'] + 10
# cv2.rectangle(image, (startX, startY), (endX, endY),
# color, 2)
cv2.putText(self.visualisation, text, (detection['startX'], y),
cv2.FONT_HERSHEY_SIMPLEX, 0.45, color, 2, lineType = cv2.LINE_AA)
alpha = 1
else:
# At least 10% opacity
alpha = max(.3, detection['confidence'])
res = cv2.addWeighted(sub_img, 1-alpha, rect_img, alpha, 1.0)
# Putting the image back to its position
self.visualisation[detection['startY']:detection['endY'], detection['startX']:detection['endX']] = res
def resize(self, width, height):
# TODO resize to new target incl all detections
img = self.visualisation
factor_x = width / self.visualisation.shape[1]
factor_y = height / self.visualisation.shape[0]
inter = cv2.INTER_NEAREST if self.algorithm in ['dnn', 'haar'] else cv2.INTER_CUBIC
img = cv2.resize(img, (width, height), interpolation=inter)
result = Result(self.algorithm, img, self.confidence_threshold)
for d in self.detections:
result.add_detection(
int(d['startX'] * factor_x),
int(d['startY'] * factor_y),
int(d['endX'] * factor_x),
int(d['endY'] * factor_y),
d['confidence']
)
return result
def record(device_id, q1,q2, q3, q4):
capture = cv2.VideoCapture(device_id)
while True:
ret, image = capture.read()
logging.debug('r')
try:
q1.put_nowait(image)
except Full as e:
# ignore if processing doesn't keep up
pass
try:
q2.put_nowait(image)
except Full as e:
# ignore if processing doesn't keep up
pass
try:
q3.put_nowait(image)
except Full as e:
# ignore if processing doesn't keep up
pass
try:
q4.put_nowait(image)
except Full as e:
# ignore if processing doesn't keep up
pass
def draw_detection(image, startX, startY, endX, endY, confidence, color=(0,0,255), confidence_threshold = .5):
# First we crop the sub-rect from the image
sub_img = image[startY:endY, startX:endX]
rect_img = sub_img.copy()
width = 2
cv2.rectangle(rect_img, (0, 0),
(sub_img.shape[1]-int(width/2), sub_img.shape[0]-int(width/2)),
color, width)
# white_rect = np.ones(sub_img.shape, dtype=np.uint8) * 255
# filter out weak detections by ensuring the `confidence` is
# greater than the minimum confidence
if confidence > confidence_threshold:
# draw the bounding box of the face along with the associated
# probability
text = "{:.2f}%".format(confidence * 100)
y = startY - 10 if startY - 10 > 10 else startY + 10
# cv2.rectangle(image, (startX, startY), (endX, endY),
# color, 2)
cv2.putText(image, text, (startX, y),
cv2.FONT_HERSHEY_SIMPLEX, 0.45, color, 2)
alpha = 1
else:
# At least 10% opacity
alpha = max(.3, confidence)
res = cv2.addWeighted(sub_img, 1-alpha, rect_img, alpha, 1.0)
# Putting the image back to its position
image[startY:endY, startX:endX] = res
def process1_hog(in_q, out_q):
from skimage.feature import hog as hog_orig
from .hog import hog # use modified version for viz
from skimage import data, exposure
import matplotlib.pyplot as plt
import dlib
face_detector = dlib.get_frontal_face_detector()
visualisation_factor = 1
detection_factor = .4
process_this_frame = True
while True:
if process_this_frame:
# Grab a single frame of video
frame = in_q.get()
frame = cv2.cvtColor(src=frame, code=cv2.COLOR_BGR2GRAY)
viz_frame = cv2.resize(frame, (0, 0), fx=visualisation_factor, fy=visualisation_factor)
det_frame = cv2.resize(frame, (0, 0), fx=detection_factor, fy=detection_factor)
start = time.time()
fd, hog_image = hog(det_frame, orientations=6, pixels_per_cell=(8, 8),
cells_per_block=(1, 1), visualize=True, multichannel=False, visualize_factor=visualisation_factor/detection_factor)
logging.debug(f"Duration of hog viz: {time.time() - start}")
hog_image_rescaled = exposure.rescale_intensity(hog_image, in_range=(0, 10))
# hog_image_rescaled = viz_frame
# Resize frame of video to 1/4 size for faster face recognition processing
# Convert the image from BGR color (which OpenCV uses) to RGB color (which face_recognition uses)
# rgb_small_frame = det_frame[:, :, ::-1]
# dets, scores, idxs = face_detector.run(rgb_small_frame, 1, -2)
dets, scores, idxs = face_detector.run(det_frame, 1, -2)
# print(dets, scores, idxs)
hog_image_rescaled = (hog_image_rescaled.astype('float32') * 255).astype('uint8')
hog_image_rescaled = cv2.cvtColor(hog_image_rescaled, cv2.COLOR_GRAY2BGR)
result = Result('hog', hog_image_rescaled, 0)
# Display the results
for i, rectangle in enumerate(dets):
probability = scores[i]
# print(rectangle)
# Scale back up face locations since the frame we detected in was scaled to 1/4 size
top = int(rectangle.top() * (visualisation_factor / detection_factor))
right = int(rectangle.right() * (visualisation_factor / detection_factor))
bottom = int(rectangle.bottom() * (visualisation_factor / detection_factor))
left = int(rectangle.left() * (visualisation_factor / detection_factor))
result.add_detection(left, top, right, bottom,probability)
# draw_detection(hog_image_rescaled, left, top, right, bottom, probability, draw_colors['hog'], 0)
# brightness = int(min(255, (probability + 1)*255))
# # Draw a box around the face
# cv2.rectangle(hog_image_rescaled, (left, top), (right, bottom), (0,0,brightness), 2)
# # Draw a label with a name below the face
# cv2.rectangle(frame, (left, bottom - 35), (right, bottom), (0, 0, 255), cv2.FILLED)
# Display the resulting image
out_q.put(result)
# print(cgray.shape)
process_this_frame = not process_this_frame
def process2_dnn(in_q, out_q):
logger = logging.getLogger('dnn')
prototxt = "dnn/face_detector/opencv_face_detector.pbtxt"
prototxt = "dnn/face_detector/deploy.prototxt"
model = "dnn/face_detector/res10_300x300_ssd_iter_140000_fp16.caffemodel"
confidence_threshold = 0.5
logger.info("[INFO] loding model...")
net = cv2.dnn.readNetFromCaffe(prototxt, model)
logger.info("Loaded")
while True:
image = in_q.get()
(h, w) = image.shape[:2]
image_small = cv2.resize(image, (300, 300))
(hs, ws) = image_small.shape[:2]
blob = cv2.dnn.blobFromImage(image_small, 1.0,
(300, 300), (104.0, 177.0, 123.0))
image = cv2.cvtColor(cv2.cvtColor(image_small, cv2.COLOR_BGR2GRAY), cv2.COLOR_GRAY2BGR)
net.setInput(blob)
detections = net.forward()
# idxs = np.argsort(detections[0])[::-1][:5]
result = Result('dnn', image)
for i in range(0, detections.shape[2]):
# extract the confidence (i.e., probability) associated with the
# prediction
confidence = detections[0, 0, i, 2]
# compute the (x, y)-coordinates of the bounding box for the
# object
# box = detections[0, 0, i, 3:7] * np.array([w, h, w, h])
box = detections[0, 0, i, 3:7] * np.array([ws, hs, ws, hs])
(startX, startY, endX, endY) = box.astype("int")
result.add_detection(startX, startY, endX, endY, confidence)
# draw_detection(image, startX, startY, endX, endY, confidence, draw_colors['dnn'])
out_q.put(result)
def process3_haar(in_q, out_q):
from cffi import FFI
from PIL import Image
import cv2
logger = logging.getLogger('haar')
ffi = FFI()
ffi.cdef("""
int test(int);
typedef void* haarclassifier;
haarclassifier classifier_new();
void scan_image(haarclassifier, size_t width,size_t height, char *input, char *buffer, size_t length, bool debug);
""")
C = ffi.dlopen("/home/ruben/Documents/Projecten/2020/rust/testproject/target/debug/libvisual_haarcascades_lib.so")
# print(C.test(9))
# i = Image.open("Marjo.jpg")
# width = i.size[0]
# height = i.size[0]
# use the rust lib to draw the visualisation
haar = C.classifier_new()
logger.info("Initialised haar classifier")
# opencv for the actual detections
faceCascade = cv2.CascadeClassifier('./haarcascade_frontalface_alt2.xml')
while True:
frame = in_q.get()
(height_orig, width_orig) = frame.shape[:2]
scale_factor = 3
width = int(width_orig/scale_factor)
height = int(height_orig/scale_factor)
frame = cv2.resize(frame, (width, height))
# Run the B&W version through opencv haar to detect faces
# for some reason the variable 'frame' is modified after
# running the visualisation, so we do this before
f = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
faces = faceCascade.detectMultiScale(f)
pixel_format = "RGB" #The raytracer only supports one format
bytes_per_pixel = 3
buffer_len = width * height * bytes_per_pixel
buffer = ffi.new("char[]", buffer_len)
buffer2 = ffi.from_buffer("char[]", frame.tobytes())
# i = Image.open("/home/ruben/Documents/Projecten/(2020/rust/lena_orig.png")
# data = i.tobytes("raw", "RGB")
logger.info("Start haar scan")
start = time.time()
C.scan_image(haar, width, height, buffer2, buffer, buffer_len, False)
logger.info(f"Visualised scan into buffer: {buffer}")
print(f"duration: {time.time() - start}s")
img = Image.frombuffer(pixel_format, (width, height), ffi.buffer(buffer),
"raw", pixel_format, 0, 1)
img= np.array(img)
# a= np.frombuffer(ffi.buffer(buffer))
# a.reshape((height, width, bytes_per_pixel))
# flip RGB back to BGR
# img = img[:, :, ::-1]
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
img = cv2.resize(img, (width_orig, height_orig))
result = Result('haar', img)
for face in faces:
x1, y1, w, h = face
x2 = x1 + w
y2 = y1 + h
# print(img.shape)
# TODO: is scale factor ok here?
# draw_detection(img, x1 * scale_factor, y1 * scale_factor, x2 * scale_factor, y2 * scale_factor, 1, draw_colors['haar'],)
result.add_detection(x1 * scale_factor, y1 * scale_factor, x2 * scale_factor, y2 * scale_factor, 1)
# print(img)
out_q.put(result)
def display(image_res, q1, q2, q3, q4):
prev_image1 = np.zeros((image_res[1],image_res[0],3), np.uint8)
prev_image2 = np.zeros((image_res[1],image_res[0],3), np.uint8)
prev_image3 = np.zeros((image_res[1],image_res[0],3), np.uint8)
prev_image4 = np.zeros((image_res[1],image_res[0],3), np.uint8)
while True:
logging.debug('r')
try:
image1 = q1.get_nowait()
image1 = cv2.resize(image1, (image_res[0], image_res[1]))
prev_image1 = image1
except Empty as e:
image1 = prev_image1
try:
result2 = q2.get_nowait()
result2 = result2.resize(image_res[0], image_res[1])
result2.draw_detections()
image2 = result2.visualisation
# image2 = cv2.resize(image2, (image_res[0], image_res[1]))
prev_image2 = image2
except Empty as e:
image2 = prev_image2
try:
result3 = q3.get_nowait()
result3 = result3.resize(image_res[0], image_res[1])
result3.draw_detections()
image3 = result3.visualisation
# image3 = cv2.resize(image3, (image_res[0], image_res[1]))
prev_image3 = image3
except Empty as e:
image3 = prev_image3
try:
result4 = q4.get_nowait()
result4 = result4.resize(image_res[0], image_res[1])
result4.draw_detections()
image4 = result4.visualisation
# image4 = cv2.resize(image4, (image_res[0], image_res[1]))
prev_image4 = image4
except Empty as e:
image4 = prev_image4
img_concate_Verti1 = np.concatenate((image1,image2),axis=0)
img_concate_Verti2 = np.concatenate((image3,image4),axis=0)
grid_img = np.concatenate((img_concate_Verti1,img_concate_Verti2),axis=1)
cv2.imshow("Output", grid_img)
# Hit 'q' on the keyboard to quit!
if cv2.waitKey(1) & 0xFF == ord('q'):
break
def main(camera_id):
image_size = (int(1920/2), int(1080/2))
# TODO should we use queues here at all?
# https://docs.python.org/3/library/multiprocessing.html#programming-guidelines
# TODO: queue maxsize, or prefrabily some sort of throttled queue (like zmq hight water mark)
q_webcam1 = Queue(maxsize=1)
q_webcam2 = Queue(maxsize=1)
q_webcam3 = Queue(maxsize=1)
q_webcam4 = Queue(maxsize=1)
q_process1 = Queue(maxsize=1)
q_process2 = Queue(maxsize=1)
q_process3 = Queue(maxsize=1)
p1 = Process(target=record, args=(camera_id, q_webcam1, q_webcam2,q_webcam3,q_webcam4))
p2 = Process(target=display, args=(image_size, q_webcam1, q_process1, q_process2, q_process3 ))
p3 = Process(target=process1_hog, args=(q_webcam2, q_process1,))
p4 = Process(target=process2_dnn, args=(q_webcam3, q_process2,))
p5 = Process(target=process3_haar, args=(q_webcam4, q_process3,))
p1.start()
p2.start()
p3.start()
p4.start()
p5.start()
p2.join() # process with the display interface
p1.kill()
p3.kill()
p4.kill()
p5.kill()