from multiprocessing import Process, Queue from queue import Empty, Full import cv2 import logging import argparse import numpy as np import time import math import datetime from PIL import ImageFont, ImageDraw, Image import os import sys draw_colors = { 'hog': (198,65,124), 'haar': (255,255,255), 'dnn': (251,212,36), } titles = { 'hog' : "Histogram of oriented gradients", 'haar' : "Haar cascades", 'dnn' : "Neural network", } project_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)),'..') fontfile = os.path.join(project_dir, "SourceSansPro-Regular.ttf") font = ImageFont.truetype(fontfile, 30) font_s = ImageFont.truetype(fontfile, 20) countdown_font = ImageFont.truetype(fontfile, 160) class Result(): def __init__(self, algorithm, image, confidence_threshold = 0.5): self.algorithm = algorithm self.visualisation = image self.detections = [] self.confidence_threshold = confidence_threshold def add_detection(self, startX, startY, endX, endY, confidence): self.detections.append({ 'startX': startX, 'startY': startY, 'endX': endX, 'endY': endY, 'confidence': confidence }) return self def draw_detections(self, include_title = False, coloured=False): cv2_im_rgb = cv2.cvtColor(self.visualisation,cv2.COLOR_BGR2RGB) # Pass the image to PIL pil_im = Image.fromarray(cv2_im_rgb) draw = ImageDraw.Draw(pil_im, 'RGBA') self.draw_detections_on(draw, coloured) if include_title: color = draw_colors[self.algorithm] if coloured else (255,255,255) draw.text((10,10), titles[self.algorithm], fill=color, font=font, stroke_width=1, stroke_fill=(0,0,0,100)) return cv2.cvtColor(np.array(pil_im), cv2.COLOR_RGB2BGR) def draw_detections_on(self, draw: ImageDraw, coloured=False, onlyIfConfident=False): ''' Draw on a specified canvas ''' color = draw_colors[self.algorithm] if coloured else (255,255,255) for detection in self.detections: self.draw_detection(draw, detection, color, onlyIfConfident) def draw_detection(self, draw: ImageDraw, detection: dict, color: tuple, onlyIfConfident: bool = False): if detection['confidence'] > self.confidence_threshold: width = 8 # draw the bounding box of the face along with the associated # probability text = "{:.0f}%".format(detection['confidence'] * 100) y = detection['startY'] - 40 if detection['startY'] - 40 > 10 else detection['startY'] + 10 draw.text((detection['startX'], y), text, font=font, fill=color, stroke_fill=(0,0,0,100), stroke_width=1) # cv2.putText(self.visualisation, text, (detection['startX'], y), # cv2.FONT_HERSHEY_SIMPLEX, 0.45, color, 2, lineType = cv2.LINE_AA) alpha = 1 draw.rectangle((detection['startX']-1, detection['startY']-1, detection['endX']+1, detection['endY']+1), outline=(0,0,0,100), width=1) draw.rectangle((detection['startX']+width, detection['startY']+width, detection['endX']-width, detection['endY']-width), outline=(0,0,0,100), width=1) elif onlyIfConfident: # Only draw if above threshold, so this should be ignored. return else: width = int(detection['confidence'] * 10 * 8) # At least 10% opacity alpha = max(.2, detection['confidence']) color = list(color) color.append(int(alpha*255)) color = tuple(color) draw.rectangle((detection['startX'], detection['startY'], detection['endX'], detection['endY']), outline=color, width=width) def resize(self, width, height, flip=False): # TODO resize to new target incl all detections img = self.visualisation factor_x = width / self.visualisation.shape[1] factor_y = height / self.visualisation.shape[0] inter = cv2.INTER_NEAREST if self.algorithm in ['dnn', 'haar'] else cv2.INTER_CUBIC img = cv2.resize(img, (width, height), interpolation=inter) if flip: img = cv2.flip(img, 1) result = Result(self.algorithm, img, self.confidence_threshold) for d in self.detections: if flip: result.add_detection( int(width - d['endX'] * factor_x), int(d['startY'] * factor_y), int(width - d['startX'] * factor_x), int(d['endY'] * factor_y), d['confidence'] ) else: result.add_detection( int(d['startX'] * factor_x), int(d['startY'] * factor_y), int(d['endX'] * factor_x), int(d['endY'] * factor_y), d['confidence'] ) return result def count_detections(self): detections = [d for d in self.detections if d['confidence'] > self.confidence_threshold] return len(detections) def record(device_id, q1,q2, q3, q4, resolution, rotate): capture = cv2.VideoCapture(device_id) is_rotated_90 = rotate in [cv2.ROTATE_90_CLOCKWISE, cv2.ROTATE_90_COUNTERCLOCKWISE] capture.set(cv2.CAP_PROP_FRAME_WIDTH, resolution[1] if is_rotated_90 else resolution[0]) capture.set(cv2.CAP_PROP_FRAME_HEIGHT, resolution[0] if is_rotated_90 else resolution[1]) gave_camera_warning = False while True: ret, image = capture.read() if image is None: logging.critical("Error with camera?") sys.exit() if rotate is not None: image = cv2.rotate(image, rotate) # Flip image to create the 'mirror' effect. image = cv2.flip(image, 1) # print(image.shape[:2], image.shape[1::-1]) if image.shape[1::-1] != resolution and not gave_camera_warning: logging.warning(f"Camera resultion seems wrong: {image.shape[:2]} instead of {resolution}") gave_camera_warning = True try: q1.put_nowait(image) except Full as e: # ignore if processing doesn't keep up pass try: q2.put_nowait(image) except Full as e: # ignore if processing doesn't keep up pass try: q3.put_nowait(image) except Full as e: # ignore if processing doesn't keep up pass try: q4.put_nowait(image) except Full as e: # ignore if processing doesn't keep up pass def draw_detection(image, startX, startY, endX, endY, confidence, color=(0,0,255), confidence_threshold = .5): # First we crop the sub-rect from the image sub_img = image[startY:endY, startX:endX] rect_img = sub_img.copy() width = 2 cv2.rectangle(rect_img, (0, 0), (sub_img.shape[1]-int(width/2), sub_img.shape[0]-int(width/2)), color, width) # white_rect = np.ones(sub_img.shape, dtype=np.uint8) * 255 # filter out weak detections by ensuring the `confidence` is # greater than the minimum confidence if confidence > confidence_threshold: # draw the bounding box of the face along with the associated # probability text = "{:.2f}%".format(confidence * 100) y = startY - 10 if startY - 10 > 10 else startY + 10 # cv2.rectangle(image, (startX, startY), (endX, endY), # color, 2) cv2.putText(image, text, (startX, y), cv2.FONT_HERSHEY_SIMPLEX, 0.45, color, 2) alpha = 1 else: # At least 10% opacity alpha = max(.3, confidence) res = cv2.addWeighted(sub_img, 1-alpha, rect_img, alpha, 1.0) # Putting the image back to its position image[startY:endY, startX:endX] = res def process1_hog(in_q, out_q): # from skimage.feature import hog as hog_orig from .hog import hog # use modified version for viz from skimage import exposure import matplotlib.pyplot as plt import dlib import matplotlib.pyplot as plt # Get the color map by name: cm = plt.get_cmap('plasma') face_detector = dlib.get_frontal_face_detector() visualisation_factor = 1 detection_factor = .3 process_this_frame = True while True: if process_this_frame: # Grab a single frame of video frame = in_q.get() frame = cv2.cvtColor(src=frame, code=cv2.COLOR_BGR2GRAY) # viz_frame = cv2.resize(frame, (0, 0), fx=visualisation_factor, fy=visualisation_factor) det_frame = cv2.resize(frame, (0, 0), fx=detection_factor, fy=detection_factor) start = time.time() fd, hog_image = hog(det_frame, orientations=6, pixels_per_cell=(8, 8), cells_per_block=(1, 1), visualize=True, multichannel=False, visualize_factor=visualisation_factor/detection_factor) logging.debug(f"Duration of hog viz: {time.time() - start}") hog_image_rescaled = exposure.rescale_intensity(hog_image, in_range=(0, 10)) # hog_image_rescaled = viz_frame # Resize frame of video to 1/4 size for faster face recognition processing # Convert the image from BGR color (which OpenCV uses) to RGB color (which face_recognition uses) # rgb_small_frame = det_frame[:, :, ::-1] # dets, scores, idxs = face_detector.run(rgb_small_frame, 1, -2) dets, scores, idxs = face_detector.run(det_frame, 1, -2) # print(dets, scores, idxs) hog_image_rescaled = (hog_image_rescaled.astype('float32') * 255).astype('uint8') # hog_image_rescaled = cv2.cvtColor(hog_image_rescaled, cv2.COLOR_GRAY2BGR) # blue background: # hog_image_rescaled[:,:,0] = 200 # Apply the colormap like a function to any array: colored_image = (cm(hog_image_rescaled) * 255).astype('uint8') colored_image = cv2.cvtColor(colored_image, cv2.COLOR_RGB2BGR) # result = Result('hog', hog_image_rescaled, 0) result = Result('hog', colored_image, 0) # Display the results for i, rectangle in enumerate(dets): probability = scores[i] # print(rectangle) # Scale back up face locations since the frame we detected in was scaled to 1/4 size top = int(rectangle.top() * (visualisation_factor / detection_factor)) right = int(rectangle.right() * (visualisation_factor / detection_factor)) bottom = int(rectangle.bottom() * (visualisation_factor / detection_factor)) left = int(rectangle.left() * (visualisation_factor / detection_factor)) result.add_detection(left, top, right, bottom,probability) # draw_detection(hog_image_rescaled, left, top, right, bottom, probability, draw_colors['hog'], 0) # brightness = int(min(255, (probability + 1)*255)) # # Draw a box around the face # cv2.rectangle(hog_image_rescaled, (left, top), (right, bottom), (0,0,brightness), 2) # # Draw a label with a name below the face # cv2.rectangle(frame, (left, bottom - 35), (right, bottom), (0, 0, 255), cv2.FILLED) # Display the resulting image out_q.put(result) # print(cgray.shape) process_this_frame = not process_this_frame def process2_dnn(in_q, out_q): logger = logging.getLogger('dnn') prototxt = "dnn/face_detector/opencv_face_detector.pbtxt" prototxt = "dnn/face_detector/deploy.prototxt" model = "dnn/face_detector/res10_300x300_ssd_iter_140000_fp16.caffemodel" confidence_threshold = 0.7 logger.info("[INFO] loding model...") net = cv2.dnn.readNetFromCaffe(prototxt, model) logger.info("Loaded") while True: image = in_q.get() (h, w) = image.shape[:2] image_small = cv2.resize(image, (300, 300)) (hs, ws) = image_small.shape[:2] blob = cv2.dnn.blobFromImage(image_small, 1.0, (300, 300), (104.0, 177.0, 123.0)) image = cv2.cvtColor(cv2.cvtColor(image_small, cv2.COLOR_BGR2GRAY), cv2.COLOR_GRAY2BGR) net.setInput(blob) detections = net.forward() # idxs = np.argsort(detections[0])[::-1][:5] result = Result('dnn', image) for i in range(0, detections.shape[2]): # extract the confidence (i.e., probability) associated with the # prediction confidence = detections[0, 0, i, 2] # compute the (x, y)-coordinates of the bounding box for the # object # box = detections[0, 0, i, 3:7] * np.array([w, h, w, h]) box = detections[0, 0, i, 3:7] * np.array([ws, hs, ws, hs]) (startX, startY, endX, endY) = box.astype("int") result.add_detection(startX, startY, endX, endY, confidence) # draw_detection(image, startX, startY, endX, endY, confidence, draw_colors['dnn']) out_q.put(result) def process3_haar(in_q, out_q, cascade_file, library_filename = None): from cffi import FFI from PIL import Image import cv2 import os logger = logging.getLogger('haar') ffi = FFI() ffi.cdef(""" int test(int); typedef void* haarclassifier; haarclassifier classifier_new(char *filename); void scan_image(haarclassifier, size_t width,size_t height, char *input, char *buffer, size_t length, size_t min_face_factor, bool debug); """) if library_filename is not None: C = ffi.dlopen(library_filename) else: lib_path = os.path.join(project_dir, "visualhaar", "target", "release") possible_paths = [ os.path.join(lib_path, "libvisual_haarcascades_lib.so"), os.path.join(lib_path, "visual_haarcascades_lib.dll"), os.path.join(project_dir, "visual_haarcascades_lib.dll"), ] existing_paths = [p for p in possible_paths if os.path.exists(p)] if not len(existing_paths): raise RuntimeError("Visual haarcascades library is not found") logger.debug(f"Using library: {existing_paths[0]}") C = ffi.dlopen(existing_paths[0]) # print(C.test(9)) # i = Image.open("Marjo.jpg") # width = i.size[0] # height = i.size[0] # use the rust lib to draw the visualisation filename = cascade_file.encode('ascii') fn = ffi.new("char[]", filename) haar = C.classifier_new(fn) logger.info("Initialised haar classifier") # opencv for the actual detections faceCascade = cv2.CascadeClassifier(cascade_file) while True: frame = in_q.get() (height_orig, width_orig) = frame.shape[:2] scale_factor = 4 width = int(width_orig/scale_factor) height = int(height_orig/scale_factor) frame = cv2.resize(frame, (width, height)) # Run the B&W version through opencv haar to detect faces # for some reason the variable 'frame' is modified after # running the visualisation, so we do this before f = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) faces = faceCascade.detectMultiScale(f) pixel_format = "RGB" #The raytracer only supports one format bytes_per_pixel = 3 buffer_len = width * height * bytes_per_pixel buffer = ffi.new("char[]", buffer_len) buffer2 = ffi.from_buffer("char[]", frame.tobytes()) # i = Image.open("/home/ruben/Documents/Projecten/(2020/rust/lena_orig.png") # data = i.tobytes("raw", "RGB") logger.info("Start haar scan") start = time.time() C.scan_image(haar, width, height, buffer2, buffer, buffer_len, 5, False) logger.info(f"Visualised scan into buffer: {buffer}") # print(f"duration: {time.time() - start}s") img = Image.frombuffer(pixel_format, (width, height), ffi.buffer(buffer), "raw", pixel_format, 0, 1) img= np.array(img) # a= np.frombuffer(ffi.buffer(buffer)) # a.reshape((height, width, bytes_per_pixel)) # flip RGB back to BGR # img = img[:, :, ::-1] img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) img = cv2.resize(img, (width_orig, height_orig)) result = Result('haar', img) for face in faces: x1, y1, w, h = face x2 = x1 + w y2 = y1 + h # print(img.shape) # TODO: is scale factor ok here? # draw_detection(img, x1 * scale_factor, y1 * scale_factor, x2 * scale_factor, y2 * scale_factor, 1, draw_colors['haar'],) result.add_detection(x1 * scale_factor, y1 * scale_factor, x2 * scale_factor, y2 * scale_factor, 1) # print(img) out_q.put(result) def draw_stats(image, results, padding, coloured=False, drawDetections=False): pil_im = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB)) draw = ImageDraw.Draw(pil_im, 'RGBA') draw_stats_on_canvas(draw, results, padding, coloured, drawDetections) return cv2.cvtColor(np.array(pil_im), cv2.COLOR_RGB2BGR) def draw_stats_on_canvas(draw, results, padding, coloured=False, drawDetections=False): for i, result in enumerate(results): if result is None: continue c = result.count_detections() txt = "face" if c == 1 else "faces" txt = f"{result.algorithm.ljust(5)} {c} {txt}" height = padding + 25 colour = draw_colors[result.algorithm] if coloured else (255,255,255) draw.text((padding, draw.im.size[1] - (i+1)*height - padding), txt, fill=colour, font=font, stroke_width=2, stroke_fill=(0,0,0)) if drawDetections: result.draw_detections_on(draw, coloured, onlyIfConfident=True) def display(image_res, q1, q2, q3, q4, fullscreen, output_dir): logger = logging.getLogger('display') empty_image = np.zeros((image_res[1],image_res[0],3), np.uint8) image_ratio = image_res[0] / image_res[1] results = [None, None, None] result_queues = [q2, q3, q4] images = [empty_image, empty_image, empty_image, empty_image] override_image = None override_until = None countdown_until = None # imageIdx = 0 # grid in the right corner preview_scale = 10 preview_width = round(image_res[0] / preview_scale) preview_height = round(preview_width / image_ratio) padding = round(image_res[0] / 100) if fullscreen: cv2.namedWindow("output", cv2.WINDOW_NORMAL) cv2.setWindowProperty("output",cv2.WND_PROP_FULLSCREEN,cv2.WINDOW_FULLSCREEN) else: cv2.namedWindow("output", cv2.WINDOW_AUTOSIZE) def selectPreview(event, x, y, flags, param): if event == cv2.EVENT_LBUTTONDOWN: if x > image_res[0] - padding or x < image_res[0] - padding - preview_width: return preview_images = [idx for idx,image in enumerate(images) if idx != selectPreview.imageIdx] for offset, image_nr in enumerate(preview_images): offset_y = (preview_height + padding) * offset # print(offset, y, image_res[0] - padding - preview_height - offset_y, image_res[0] - padding - offset_y) if y > image_res[1] - padding - preview_height - offset_y and y < image_res[1] - padding - offset_y: selectPreview.imageIdx = image_nr print("Select image", offset, image_nr) break selectPreview.imageIdx = 0 cv2.setMouseCallback('output', selectPreview) while True: try: image = q1.get_nowait() images[0] = cv2.resize(image, (image_res[0], image_res[1])) except Empty as e: pass for idx, queue in enumerate(result_queues): try: result = queue.get_nowait() results[idx] = result.resize(image_res[0], image_res[1]) images[idx+1] = results[idx].draw_detections(include_title=True) except Empty as e: pass finally: pass if override_image is not None and override_until > time.time(): cv2.imshow("output", override_image) else: override_image = None # images[0] = draw_stats(images[0], results) # show the selected image: grid_img = images[selectPreview.imageIdx].copy() # previews in the right bottom corner preview_images = [image for idx,image in enumerate(images) if idx != selectPreview.imageIdx] for idx, image in enumerate(preview_images): offset_y = (preview_height + padding) * idx grid_img[ grid_img.shape[0] - padding - preview_height - offset_y:grid_img.shape[0] - padding - offset_y, grid_img.shape[1] - padding - preview_width:grid_img.shape[1] - padding] = cv2.resize(image, (preview_width, preview_height), cv2.INTER_CUBIC) # statistics # for the plain webcam image (no viz), draw all detected faces. drawDetections = (selectPreview.imageIdx == 0) grid_img = draw_stats(grid_img, results, padding, coloured=True, drawDetections=drawDetections) pil_im = Image.fromarray(cv2.cvtColor(grid_img, cv2.COLOR_BGR2RGB)) draw = ImageDraw.Draw(pil_im, 'RGBA') # Draw countdown if countdown_until: duration = math.ceil(countdown_until - time.time()) w, h = draw.textsize(f"{duration}", font=countdown_font) draw.text(((grid_img.shape[1]-w)/2,(grid_img.shape[0]-h)/2), f"{duration}", fill="white", stroke="black", font=countdown_font, stroke_width=1, stroke_fill=(0,0,0,100)) grid_img = cv2.cvtColor(np.array(pil_im), cv2.COLOR_RGB2BGR) # img_concate_Verti1 = np.concatenate((images[0],images[1]),axis=0) # img_concate_Verti2 = np.concatenate((images[2],images[3]),axis=0) # grid_img = np.concatenate((img_concate_Verti1,img_concate_Verti2),axis=1) cv2.imshow("output", grid_img) # Hit 'q' on the keyboard to quit! key = cv2.waitKey(1) & 0xFF if key == ord('q') or key == 27: # key 27: escape break # TODO: the truth value of an array with ore than one element is ambiguous, use a.any or a.all() (OF DUS override_image is None) if key == ord(' ') and override_image is None: countdown_until = time.time() + 3 # seconds of countdown # SNAP! SAVE FRAMES if countdown_until is not None and time.time() > countdown_until: countdown_until = None # TODO wait for frame to be processed. Eg. if I move and make a pic, it should use the last frame... # output_res = (image_res[0] *2, image_res[1] * 2) output_res = image_res # no scaling needed anyore pil_im = Image.fromarray(cv2.cvtColor(cv2.flip(images[0],1), cv2.COLOR_BGR2RGB)) pil_im = pil_im.resize(output_res) # base name for all images name = datetime.datetime.now().isoformat(timespec='seconds').replace(':','-') # filename of clean frame filename = os.path.join(output_dir, f'{name}-frame.jpg') pil_im.save(filename) # now draw all results to the main image draw = ImageDraw.Draw(pil_im, 'RGBA') for result in results: if result is None: continue result.resize(output_res[0], output_res[1], flip=True).draw_detections_on(draw, coloured=True) draw_stats_on_canvas(draw, results, padding, coloured=True) override_image = cv2.cvtColor(np.array(pil_im), cv2.COLOR_RGB2BGR) override_until = time.time() + 5 logger.info("Show frame until %f", override_until) # save images: filename = os.path.join(output_dir, f'{name}-all.png') print(f"Save to {filename}") r=cv2.imwrite(filename, override_image) if not r: raise RuntimeError(f"Could not save image {filename}") # finally, store each visualisation with the results for result in results: result_img =result.draw_detections(include_title = True) filename = os.path.join(output_dir, f'{name}-{result.algorithm}.png') r = cv2.imwrite(filename, result_img) if not r: raise RuntimeError(f"Could not save image {filename}") def main(camera_id, rotate, fullscreen, cascade_file, output_dir, visualhaar_lib = None): image_size = (1920, 1080) #(int(1920/2), int(1080/2)) if not os.path.exists(cascade_file): raise RuntimeError(f"Cannot load OpenCV haar-cascade file '{cascade_file}'") if not os.path.isdir(output_dir): raise RuntimeError(f"Non-existent directory to store files '{output_dir}'") is_rotated_90 = rotate in [cv2.ROTATE_90_CLOCKWISE, cv2.ROTATE_90_COUNTERCLOCKWISE] if is_rotated_90: image_size = (image_size[1], image_size[0]) # TODO should we use queues here at all? # https://docs.python.org/3/library/multiprocessing.html#programming-guidelines # TODO: queue maxsize, or prefrabily some sort of throttled queue (like zmq hight water mark) q_webcam1 = Queue(maxsize=1) q_webcam2 = Queue(maxsize=1) q_webcam3 = Queue(maxsize=1) q_webcam4 = Queue(maxsize=1) q_process1 = Queue(maxsize=1) q_process2 = Queue(maxsize=1) q_process3 = Queue(maxsize=1) p1 = Process(target=record, args=(camera_id, q_webcam1, q_webcam2,q_webcam3,q_webcam4, image_size, rotate)) p2 = Process(target=display, args=(image_size, q_webcam1, q_process1, q_process2, q_process3, fullscreen, output_dir )) p3 = Process(target=process1_hog, args=(q_webcam2, q_process1,)) p4 = Process(target=process2_dnn, args=(q_webcam3, q_process2,)) p5 = Process(target=process3_haar, args=(q_webcam4, q_process3,cascade_file, visualhaar_lib)) p1.start() p2.start() p3.start() p4.start() p5.start() p2.join() # process with the display interface p1.kill() p3.kill() p4.kill() p5.kill()