#!/usr/bin/env python import cv2 import dlib import numpy as np import os import pickle import logging from scipy.ndimage.filters import gaussian_filter from PIL import Image, ImageDraw,ImageTk import pandas as pd import seaborn as sns from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg from matplotlib.figure import Figure from matplotlib import cm import sys if sys.version_info[0] < 3: import Tkinter as Tk else: import tkinter as Tk import time import datetime import Queue import coloredlogs import argparse import multiprocessing argParser = argparse.ArgumentParser(description='Draw a heatmap') argParser.add_argument( '--camera', '-c', default=0, type=int, help='The id of the camera' ) argParser.add_argument( '--verbose', '-v', action="store_true", ) argParser.add_argument( '--hide-graph', action="store_true", ) argParser.add_argument( '--hide-preview', action="store_true", ) argParser.add_argument( '--output-dir', '-o', help="directory in which to store evey x files", ) argParser.add_argument( '--save-interval', type=int, default=15, help="Interval at which to save heatmap frames (in seconds)" ) argParser.add_argument( '--queue-length', type=int, default=0, help="Nr of frames to keep in queue (adds a delay)" ) argParser.add_argument( '--processes', type=int, default=4, help="Nr of total processes (min 3)" ) args = argParser.parse_args() coloredlogs.install( level=logging.DEBUG if args.verbose else logging.INFO, # format='%(asctime)-15s %(name)s %(levelname)s: %(message)s' ) logger = logging.getLogger(__name__) # im = cv2.imread("headPose.jpg"); predictor_path = "shape_predictor_68_face_landmarks.dat" if args.output_dir: lastMetricsFilename = os.path.join(args.output_dir, 'last_metrics.p') else: lastMetricsFilename = None screenDrawCorners = np.array([[10,60], [90, 60], [10, 110], [90, 110]]) # metrics matrix metricsSize = [1920,1080] metricsSize = [1280,800] metricsSize = [960,600] dataframe = pd.DataFrame(columns=['x','y']) renderSize = [1280,800] metrics = None if lastMetricsFilename and os.path.isfile(lastMetricsFilename): try: with open(lastMetricsFilename, "rb") as fp: metrics = pickle.load(fp) logger.warn("Loaded metrics from {}".format(lastMetricsFilename)) except Exception as e: logger.exception(e) if metrics is None: metrics = np.zeros((metricsSize[1], metricsSize[0])) # (y, x) logger.warn("New metrics") screenDrawCorners = np.array([[0,0], [1919,0], [0, 1079], [1919,1079]]) def create_perspective_transform_matrix(src, dst): """ Creates a perspective transformation matrix which transforms points in quadrilateral ``src`` to the corresponding points on quadrilateral ``dst``. Will raise a ``np.linalg.LinAlgError`` on invalid input. """ # See: # * http://xenia.media.mit.edu/~cwren/interpolator/ # * http://stackoverflow.com/a/14178717/71522 in_matrix = [] for (x, y), (X, Y) in zip(src, dst): in_matrix.extend([ [x, y, 1, 0, 0, 0, -X * x, -X * y], [0, 0, 0, x, y, 1, -Y * x, -Y * y], ]) A = np.matrix(in_matrix, dtype=np.float) B = np.array(dst).reshape(8) af = np.dot(np.linalg.inv(A.T * A) * A.T, B) m = np.append(np.array(af).reshape(8), 1).reshape((3, 3)) logger.info("Created transformmatrix: src {} dst {} m {}".format( src, dst, m)) return m # got this amazing thing from here: https://stackoverflow.com/a/24088499 def create_perspective_transform(src, dst, round=False, splat_args=False): """ Returns a function which will transform points in quadrilateral ``src`` to the corresponding points on quadrilateral ``dst``:: >>> transform = create_perspective_transform( ... [(0, 0), (10, 0), (10, 10), (0, 10)], ... [(50, 50), (100, 50), (100, 100), (50, 100)], ... ) >>> transform((5, 5)) (74.99999999999639, 74.999999999999957) If ``round`` is ``True`` then points will be rounded to the nearest integer and integer values will be returned. >>> transform = create_perspective_transform( ... [(0, 0), (10, 0), (10, 10), (0, 10)], ... [(50, 50), (100, 50), (100, 100), (50, 100)], ... round=True, ... ) >>> transform((5, 5)) (75, 75) If ``splat_args`` is ``True`` the function will accept two arguments instead of a tuple. >>> transform = create_perspective_transform( ... [(0, 0), (10, 0), (10, 10), (0, 10)], ... [(50, 50), (100, 50), (100, 100), (50, 100)], ... splat_args=True, ... ) >>> transform(5, 5) (74.99999999999639, 74.999999999999957) If the input values yield an invalid transformation matrix an identity function will be returned and the ``error`` attribute will be set to a description of the error:: >>> tranform = create_perspective_transform( ... np.zeros((4, 2)), ... np.zeros((4, 2)), ... ) >>> transform((5, 5)) (5.0, 5.0) >>> transform.error 'invalid input quads (...): Singular matrix """ try: transform_matrix = create_perspective_transform_matrix(src, dst) error = None except np.linalg.LinAlgError as e: transform_matrix = np.identity(3, dtype=np.float) error = "invalid input quads (%s and %s): %s" %(src, dst, e) error = error.replace("\n", "") to_eval = "def perspective_transform(%s):\n" %( splat_args and "*pt" or "pt", ) to_eval += " res = np.dot(transform_matrix, ((pt[0], ), (pt[1], ), (1, )))\n" to_eval += " res = res / res[2]\n" if round: to_eval += " return (int(round(res[0][0])), int(round(res[1][0])))\n" else: to_eval += " return (res[0][0], res[1][0])\n" locals = { "transform_matrix": transform_matrix, } locals.update(globals()) exec to_eval in locals, locals res = locals["perspective_transform"] res.matrix = transform_matrix res.error = error return res def coordinatesToSrc(coordinates): return np.array([coordinates['tl'], coordinates['tr'],coordinates['bl'], coordinates['br']]) # coordinates of the screen boundaries if os.path.exists("coordinates.p"): coordinates = pickle.load(open("coordinates.p", "rb")) transform = create_perspective_transform(coordinatesToSrc(coordinates), screenDrawCorners) a = [np.array([ 1312.15541183]), np.array([ 244.56278002]), 0] logger.info("Loaded coordinates: %s", coordinatesToSrc(coordinates)) logger.debug("Corners: %s", screenDrawCorners) logger.debug("Test point %s", a) logger.debug("Transformed point %s", transform(a[0:2])) # exit() else: coordinates = {'tl': None, 'tr': None, 'bl': None, 'br': None} transform = None if not args.hide_graph: windowRoot = Tk.Toplevel() windowSize = (1000,1000) windowRoot.geometry('%dx%d+%d+%d' % (windowSize[0],windowSize[1],0,0)) figure = Figure(figsize=(16, 9), dpi=100) axes = figure.add_subplot(111) axes.set_title('Tk embedding') axes.set_xlabel('X axis label') axes.set_ylabel('Y label') # canvas = Tk.Canvas(windowRoot,width=1000,height=1000) canvas = FigureCanvasTkAgg(figure,master=windowRoot) canvas.draw() canvas.get_tk_widget().pack(side=Tk.TOP, fill=Tk.BOTH, expand=1) imageWindowRoot = Tk.Toplevel() imageWindowSize = tuple(renderSize) imageWindowRoot.geometry('%dx%d+%d+%d' % (imageWindowSize[0],imageWindowSize[1],0,0)) imageWindowRoot.attributes("-fullscreen", True) # imageCanvas is where the heatmap image is drawn imageCanvas = Tk.Canvas(imageWindowRoot,width=renderSize[0],height=renderSize[1]) imageCanvas.pack() cv2.namedWindow("test", cv2.WND_PROP_FULLSCREEN) cv2.setWindowProperty("test", cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_FULLSCREEN) if args.output_dir: startTime = time.time() lastSaveTime = startTime if args.queue_length: imageQueue = [] lock = multiprocessing.Lock() photoQueue = multiprocessing.Queue(maxsize=args.processes) pointsQueue = multiprocessing.Queue(maxsize=args.processes) def captureFacesPoints(i): logger.info("Start capturer {}".format( i)) # dedicated detector & predictor instances: detector = dlib.get_frontal_face_detector() predictor = dlib.shape_predictor(predictor_path) while True: t1 = time.time() im = photoQueue.get(block=True, timeout=10) if im is None: continue logger.debug("Got foto in {}".format( i)) size = im.shape t2 = time.time() logger.debug("Captured frame in %fs", t2-t1) # Docs: Ask the detector to find the bounding boxes of each face. The 1 in the # second argument indicates that we should upsample the image 1 time. This # will make everything bigger and allow us to detect more faces. dets = detector(im, 1) t3 = time.time() logger.debug("Number of faces detected: {} - took {}s".format(len(dets), t3-t2)) # We use this later for calibrating currentPoint = None currentPoints = [] if len(dets) > 0: for d in dets: td1 = time.time() shape = predictor(im, d) td2 = time.time() logger.debug("Found face points in %fs", td2-td1) #2D image points. If you change the image, you need to change vector image_points = np.array([ (shape.part(30).x,shape.part(30).y), # Nose tip (shape.part(8).x,shape.part(8).y), # Chin (shape.part(36).x,shape.part(36).y), # Left eye left corner (shape.part(45).x,shape.part(45).y), # Right eye right corne (shape.part(48).x,shape.part(48).y), # Left Mouth corner (shape.part(54).x,shape.part(54).y) # Right mouth corner ], dtype="double") # 3D model points. model_points = np.array([ (0.0, 0.0, 0.0), # Nose tip (0.0, -330.0, -65.0), # Chin (-225.0, 170.0, -135.0), # Left eye left corner (225.0, 170.0, -135.0), # Right eye right corne (-150.0, -150.0, -125.0), # Left Mouth corner (150.0, -150.0, -125.0) # Right mouth corner ]) # Camera internals focal_length = size[1] center = (size[1]/2, size[0]/2) camera_matrix = np.array( [[focal_length, 0, center[0]], [0, focal_length, center[1]], [0, 0, 1]], dtype = "double" ) # logger.info ("Camera Matrix :\n {0}".format(camera_matrix)) dist_coeffs = np.zeros((4,1)) # Assuming no lens distortion (success, rotation_vector, translation_vector) = cv2.solvePnP(model_points, image_points, camera_matrix, dist_coeffs, flags=cv2.SOLVEPNP_ITERATIVE) if not success: logger.info("Error determening PnP {}".format(success) ) continue logger.debug ("Rotation Vector:\n %s", rotation_vector) logger.debug ("Translation Vector:\n {0}".format(translation_vector)) # Project a 3D point (0, 0, 1000.0) onto the image plane. # We use this to draw a line sticking out of the nose (nose_end_point2D, jacobian) = cv2.projectPoints(np.array([(0.0, 0.0, 1000.0)]), rotation_vector, translation_vector, camera_matrix, dist_coeffs) for p in image_points: cv2.circle(im, (int(p[0]), int(p[1])), 3, (0,0,255), -1) p1 = ( int(image_points[0][0]), int(image_points[0][1])) p2 = ( int(nose_end_point2D[0][0][0]), int(nose_end_point2D[0][0][1])) cv2.line(im, p1, p2, (255,0,0), 2) rotMatrix = np.zeros([3,3]) cv2.Rodrigues(rotation_vector, rotMatrix, jacobian=0) # Find rotation: https://stackoverflow.com/a/15029416 # not used anymore :-) # rx = np.arctan2(rotMatrix[2,1], rotMatrix[2,2]) # ry = np.arctan2(-rotMatrix[2,0], np.sqrt(np.square(rotMatrix[2,1]) + np.square(rotMatrix[2,2]))) # rz = np.arctan2(rotMatrix[1,0],rotMatrix[0,0]) # logger.info("rotation {} {} {}".format(rx, ry, rz) ) # ry = - np.arcsin(rotMatrix[0,2]) # rx = np.arctan2(rotMatrix[1,2]/np.cos(ry), rotMatrix[2,2]/np.cos(ry)) # rz = np.arctan2(rotMatrix[0,1]/np.cos(ry), rotMatrix[0,0]/np.cos(ry)) # logger.info("rotation ml {} {} {}".format(rx, ry, rz) )# seems better? viewDirectionVector = np.dot(np.array([0.0, 0.0, 1.0]), rotMatrix) if not args.hide_preview: # draw little floorplan for x: 10 -> 50 maps to z: 0 -> 10000, x: -2000 -> 2000 mapPosX = int((translation_vector[0] + 500) / 1000 * 40) mapPosY = int((translation_vector[1] + 500) / 1000 * 40) mapPosZ = int((translation_vector[2] + 0 ) / 10000 * 40) cv2.circle(im, (mapPosZ + 10, mapPosX + 10), 2, (0,0,255), -1) cv2.circle(im, (mapPosZ + 60, mapPosY + 10), 2, (0,0,255), -1) # make it an _amazing_ stick figurine for the side view cv2.line(im, (mapPosZ + 60, mapPosY + 10), (mapPosZ + 60, mapPosY + 20), (0,0,255), 1) cv2.line(im, (mapPosZ + 60, mapPosY + 20), (mapPosZ + 55, mapPosY + 25), (0,0,255), 1) cv2.line(im, (mapPosZ + 60, mapPosY + 20), (mapPosZ + 65, mapPosY + 25), (0,0,255), 1) cv2.line(im, (mapPosZ + 60, mapPosY + 15), (mapPosZ + 55, mapPosY + 10), (0,0,255), 1) cv2.line(im, (mapPosZ + 60, mapPosY + 15), (mapPosZ + 65, mapPosY + 10), (0,0,255), 1) # draw rotation vector cv2.circle(im, (mapPosZ + 60, mapPosY + 10), 2, (0,0,255), -1) cv2.line(im, (mapPosZ + 10, mapPosX + 10), (mapPosZ + 10 + int(viewDirectionVector[2] * 100), mapPosX + 10 + int(viewDirectionVector[0] * 100)), (255,255,0), 1) cv2.line(im, (mapPosZ + 60, mapPosY + 10), (mapPosZ + 60 + int(viewDirectionVector[2] * 100), mapPosY + 10 - int(viewDirectionVector[1] * 100)), (255,0,255), 1) # Translation vector gives position in space: # x, y z: 0,0,0 is center of camera # line: (x,y,z) = f(a) = (t1 + r1*a, t2+r2*a, t3 + r3*a) # Screen: (x,y,z) = (x,y,0) # Interesection: # x = t1 + r1 * a # y = t2 + r2 * a # z = t3 * r3 * a = 0 # => a = -t3 / r3 # substitute found a in x,y # seems to be wrong? # a = - translation_vector[2]# / rotation_vector[2] # x = translation_vector[0] + rotation_vector[0] * a # y = translation_vector[1] + rotation_vector[1] * a # logger.warn("First {} {},{}".format(a,x,y)) a = - translation_vector[2]# / viewDirectionVector[2] x = translation_vector[0] + viewDirectionVector[0] * a y = translation_vector[1] + viewDirectionVector[1] * a # logger.warn("Second {} {},{}".format(a,x,y)) point = np.array([x,y]) currentPoint = point currentPoints.append(point) td3 = time.time() logger.debug("Timer: All other face drawing stuff in %fs", td3-td2) # TODO only draw nose line now, so we can change color depending whether on screen or not results = {'currentPoint': currentPoint, 'currentPoints': currentPoints, 'im': im} try: pointsQueue.put_nowait(results) except Queue.Full as e: logger.critical("Reslt queue full?") # not applicable to multiprocessing.queue in p2.7: photoQueue.task_done() def captureVideo(): c = cv2.VideoCapture(args.camera) # set camera resoltion c.set(3, 1280) c.set(4, 720) logger.debug("Camera FPS: {}".format(c.get(5))) while True: _, im = c.read() try: photoQueue.put_nowait(im) except Queue.Full as e: logger.debug("Photo queue full") time.sleep(.05) logger.debug("Que sizes: image: {}, points: {} ".format(photoQueue.qsize(), pointsQueue.qsize())) processes = [] for i in range(args.processes - 2): p = multiprocessing.Process(target=captureFacesPoints, args=(i,)) p.daemon = True p.start() processes.append(p) p = multiprocessing.Process(target=captureVideo, args=()) p.daemon = True p.start() processes.append(p) while True: te1 = time.time() result = pointsQueue.get() im = result['im'] currentPoint = result['currentPoint'] currentPoints = result['currentPoints'] if not args.hide_preview: # draw little floorplan for 10 -> 50, sideplan 60 -> 100 (40x40 px) cv2.rectangle(im, (9, 9), (51, 51), (255,255,255), 1) cv2.rectangle(im, (59, 9), (101, 51), (255,255,255), 1) cv2.line(im, (10,10), (10,50), (200,200,200), 2) cv2.line(im, (60,10), (60,50), (200,200,200), 2) # screen is 16:10 cv2.rectangle(im, (9, 59), (91, 111), (255,255,255), 1) if transform is None: if not args.hide_preview: cv2.putText(im, "1", (10,70), cv2.FONT_HERSHEY_PLAIN, .7, (255,255,255) if coordinates['tl'] is not None else (0,0,255)) cv2.putText(im, "2", (85,70), cv2.FONT_HERSHEY_PLAIN, .7, (255,255,255) if coordinates['tr'] is not None else (0,0,255)) cv2.putText(im, "3", (10,110), cv2.FONT_HERSHEY_PLAIN, .7, (255,255,255) if coordinates['bl'] is not None else (0,0,255)) cv2.putText(im, "4", (85,110), cv2.FONT_HERSHEY_PLAIN, .7, (255,255,255) if coordinates['br'] is not None else (0,0,255)) tm1 = 0 tm2 = 0 tm3 = 0 tm4 = 0 else: tm1 = time.time() newMetrics = np.zeros((metricsSize[1], metricsSize[0])) tm2 = time.time() for point in currentPoints: # check if within coordinates: # dot1 = np.dot(coordinates['tl'] - point, coordinates['tl'] - coordinates['br']) # dot2 = np.dot(coordinates['bl'] - point, coordinates['tl'] - coordinates['br']) # pointIn3 = [point[0], point[1], 0] # targetPoint = np.dot(pointIn3, transformationMatrix) # logger.info("Looking at", pointIn3, np.dot( transformationMatrix, pointIn3)) targetPoint = transform(point) logger.info("Looking at {} {}".format(point, targetPoint) ) # cv2.circle(im, (int(targetPoint[0]), int(targetPoint[1])), 2, (0,255,0), -1) # from 1920x1080 to 80x50 miniTargetPoint = (int(targetPoint[0] / 1920 * 80 + 10), int(targetPoint[1] / 1080 * 50 + 60)) cv2.circle(im, miniTargetPoint, 2, (0,255,0), -1) targetInt = (int(targetPoint[0]), int(targetPoint[1])) # check if point fits on screen: # if so, measure it if targetInt[0] >= 0 and targetInt[1] >= 0 and targetInt[0] < metricsSize[0] and targetInt[1] < metricsSize[1]: dataframe = dataframe.append({'x':targetInt[0],'y':targetInt[1]}, ignore_index=True) logger.debug("Put metric {},{} in metrix of {},{}".format(targetInt[1],targetInt[0], metricsSize[1], metricsSize[0])) newMetrics[targetInt[1],targetInt[0]] += 1 # TODO: put in an image of a blurred spot & remove blur action # after we collected all new metrics, blur them foor smoothness # and add to all metrics collected tm3 = time.time() # metrics = metrics + gaussian_filter(newMetrics, sigma = 13) metrics = metrics + newMetrics tm4 = time.time() logger.debug("Updated matrix with blur in %f", tm4 - tm3 + tm2 - tm1) # Display webcam image with overlays te2 = time.time() if not args.hide_preview: cv2.imshow("Output", im) te3 = time.time() logger.debug("showed webcam image in %fs", te3-te2) logger.debug("Rendering took %fs", te3-te1) # blur smooth the heatmap # logger.debug("Max blurred metrics: %f", np.max(metrics)) # update the heatmap output tm21 = time.time() # smooth impact of first hits by having at least 0.05 normalisedMetrics = metrics / (max(.02, np.max(metrics))) # convert to colormap, thanks to: https://stackoverflow.com/a/10967471 normalisedMetricsColored = np.uint8(cm.nipy_spectral(normalisedMetrics)*255) normalisedMetricsColoredBGR = cv2.cvtColor(normalisedMetricsColored, cv2.COLOR_RGB2BGR) tm22 = time.time() logger.debug("Max normalised metrics: %f", np.max(normalisedMetrics)) # logger.info(normalisedMetrics) tm23 = time.time() cv2.imshow("test",normalisedMetricsColoredBGR) # image = Image.fromarray(normalisedMetricsColored) # wpercent = (imageWindowSize[0] / float(image.size[0])) # hsize = int((float(image.size[1]) * float(wpercent))) # renderImage = image.resize((renderSize[0], renderSize[1])) # print(renderImage.size, "lala") # if args.queue_length: # imageQueue.append(image) # if len(imageQueue) > args.queue_length: # logger.warn("Use image from queue :-)") # image = imageQueue.pop(0) # tkpi = ImageTk.PhotoImage(renderImage) # imageCanvas.delete("IMG") # imagesprite = imageCanvas.create_image(renderSize[0]/2, renderSize[1]/2,image=tkpi, tags="IMG") # imageWindowRoot.update() tm24 = time.time() logger.debug("PIL image generated in %fs", tm24 - tm23) logger.debug("Total matrix time is %fs", tm4 - tm3 + tm2 - tm1 + tm24 - tm21) if not args.hide_graph: te4 = time.time() axes.clear() if(len(dataframe) > 2): g = sns.kdeplot(dataframe['x'], dataframe['y'],ax=axes, n_levels=30, shade=True, cmap=cm.rainbow) canvas.draw() windowRoot.update() te5 = time.time() logger.debug("Drew graph & updated window in %fs", te5-te4) if args.output_dir: # save output to dir now = tm24 # time.time() if now - lastSaveTime > args.save_interval: filename = os.path.join( args.output_dir, "frame{}.png".format( datetime.datetime.now().replace(microsecond=0).isoformat() ) ) cv2.imwrite(filename, normalisedMetricsColoredBGR) # image.save(filename) with open(lastMetricsFilename, 'wb') as fp: pickle.dump( metrics, fp ) logger.debug("Saved frame to {}".format(filename)) lastSaveTime = now # (optionally) very slowly fade out previous metrics: metrics = metrics * .9997 keyPress = cv2.waitKey(5) if keyPress==27: break elif keyPress == ord('d'): logger.setLevel(logging.DEBUG) elif keyPress > -1 and currentPoint is not None: recalculate = False if keyPress == ord('1'): coordinates['tl'] = currentPoint recalculate = True elif keyPress == ord('2'): coordinates['tr'] = currentPoint recalculate = True elif keyPress == ord('3'): coordinates['bl'] = currentPoint recalculate = True elif keyPress == ord('4'): coordinates['br'] = currentPoint recalculate = True elif keyPress == ord('t') and transform is not None: logger.info("Coordinates {}".format(coordinates) ) logger.info("Drawing area {}".format(screenDrawCorners)) logger.info("Test point {}".format(currentPoint )) logger.info("Transformed point {}".format(transform(currentPoint))) if recalculate is True and not any (x is None for x in coordinates.values()): logger.debug(coordinates.values()) pickle.dump( coordinates, open( "coordinates.p", "wb" ) ) logger.info("Saved coordinates") transform = create_perspective_transform(coordinatesToSrc(coordinates), screenDrawCorners) duration = time.time()-te1 fps = 1/duration logger.info("Rendering loop %fs %ffps", duration, fps) cv2.destroyAllWindows()