import numpy as np import os import pickle import logging from scipy.ndimage.filters import gaussian_filter from PIL import Image, ImageDraw,ImageTk from matplotlib import cm import sys if sys.version_info[0] < 3: import Tkinter as Tk else: import tkinter as Tk import time import argparse import subprocess import json import termios, fcntl, os class Heatmap: def __init__(self, metricsSize, logger, coordinates_filename): self.coordinates_filename = coordinates_filename self.logger = logger self.metricsSize = metricsSize self.metrics = np.zeros((metricsSize[1], metricsSize[0])) # (y, x) self.screenDrawCorners = np.array([ [0,0], [metricsSize[0]-1,0], [0, metricsSize[1]-1], [metricsSize[0]-1,metricsSize[1]-1] ]) self.loadCoordinates() self.windowRoot = Tk.Toplevel() imageWindowSize = tuple(metricsSize) self.windowRoot.geometry('%dx%d+%d+%d' % (imageWindowSize[0],imageWindowSize[1],0,0)) self.canvas = Tk.Canvas(self.windowRoot,width=imageWindowSize[0],height=imageWindowSize[1]) self.canvas.pack() self.updateWindow() def updateFromJson(self, frame): self.logger.info("Received %s", frame) newMetrics = np.zeros((self.metricsSize[1], self.metricsSize[0])) for face in frame: # {u'confidence': 0.983333, u'head_rot': [0.270533, -0.0669274, 0.113554], u'gaze_angle': [0.025313, 0.403179], u'fid': 0, u'head_pos': [73.5302, 26.4475, 399.764]} x, y = self.getTargetOfFace(face) self.logger.debug("Face %d on %s", face['fid'], [x,y]) targetPoint = self.transform(np.array([x,y])) self.logger.info("Looking at %s", targetPoint) targetInt = (int(targetPoint[0]), int(targetPoint[1])) # check if point fits on screen: # if so, measure it if targetInt[0] >= 0 and targetInt[1] >= 0 and targetInt[0] < self.metricsSize[1] and targetInt[1] < self.metricsSize[0]: newMetrics[targetInt[1],targetInt[0]] += float(face['confidence']) self.metrics = self.metrics + gaussian_filter(newMetrics, sigma = 8) self.updateWindow() def updateWindow(self): normalisedMetrics = self.metrics / (np.max(self.metrics)) # convert to colormap, thanks to: https://stackoverflow.com/a/10967471 normalisedMetrics = np.uint8(cm.plasma(normalisedMetrics)*255) image = Image.fromarray(normalisedMetrics) wpercent = (self.metricsSize[0] / float(image.size[0])) hsize = int((float(image.size[1]) * float(wpercent))) image = image.resize((self.metricsSize[0], hsize)) tkpi = ImageTk.PhotoImage(image) self.canvas.delete("IMG") imagesprite = self.canvas.create_image(500,500,image=tkpi, tags="IMG") self.windowRoot.update() def getTargetOfFace(self, face): x = np.arctan(face['gaze_angle'][0])*face['head_pos'][2] + face['head_pos'][0] y = np.arctan(face['gaze_angle'][1])*face['head_pos'][2] + face['head_pos'][1] return (x,y) def loadCoordinates(self): # coordinates of the screen boundaries if os.path.exists(self.coordinates_filename): self.coordinates = pickle.load(open(self.coordinates_filename, "rb")) else: self.coordinates = {'tl': None, 'tr': None, 'bl': None, 'br': None} self.updateTransform() def saveCoordinates(self): self.logger.debug(self.coordinates.values()) pickle.dump(self.coordinates, open( self.coordinates_filename, "wb" ) ) self.logger.info("Saved coordinates to %s", self.coordinates_filename) def updateTransform(self): if self.hasAllCoordinates() : self.transform = create_perspective_transform(coordinatesToSrc(self.coordinates), self.screenDrawCorners) else: self.transform = None self.logger.debug("Corners: %s", self.screenDrawCorners) def hasAllCoordinates(self): return not any (x is None for x in self.coordinates.values()) def setCoordinate(self, pos, face): self.coordinates[pos] = self.getTargetOfFace(face) if self.hasAllCoordinates(): self.saveCoordinates() self.updateTransform() def main(openface_exec, coordinates_filename, device=0): logging.basicConfig( format='%(asctime)-15s %(name)s %(levelname)s: %(message)s' ) logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) fd = sys.stdin.fileno() oldterm = termios.tcgetattr(fd) newattr = termios.tcgetattr(fd) newattr[3] = newattr[3] & ~termios.ICANON & ~termios.ECHO termios.tcsetattr(fd, termios.TCSANOW, newattr) oldflags = fcntl.fcntl(fd, fcntl.F_GETFL) fcntl.fcntl(fd, fcntl.F_SETFL, oldflags | os.O_NONBLOCK) try: # metrics matrix metricsSize = [1920,1080] heatmap = Heatmap(metricsSize, logger, coordinates_filename) for output in execute([openface_exec, "-device", str(device), "-cam_width", "1280", "-cam_height", "720"]): try: frame = json.loads(output) heatmap.updateFromJson(frame) try: c = sys.stdin.read(1) c = int(c) if c == 1: heatmap.setCoordinate("tl", frame[0]) elif c == 2: heatmap.setCoordinate("tr", frame[0]) elif c == 3: heatmap.setCoordinate("bl", frame[0]) elif c == 4: heatmap.setCoordinate("br", frame[0]) except IOError: pass except Exception as e: logger.warning(str(e)) logger.warning("received %s", output) finally: termios.tcsetattr(fd, termios.TCSAFLUSH, oldterm) fcntl.fcntl(fd, fcntl.F_SETFL, oldflags) # thanks to https://stackoverflow.com/a/4417735 def execute(cmd): popen = subprocess.Popen(cmd, stdout=subprocess.PIPE, universal_newlines=True) for stdout_line in iter(popen.stdout.readline, ""): yield stdout_line popen.stdout.close() return_code = popen.wait() if return_code: raise subprocess.CalledProcessError(return_code, cmd) def create_perspective_transform_matrix(src, dst): """ Creates a perspective transformation matrix which transforms points in quadrilateral ``src`` to the corresponding points on quadrilateral ``dst``. Will raise a ``np.linalg.LinAlgError`` on invalid input. """ # See: # * http://xenia.media.mit.edu/~cwren/interpolator/ # * http://stackoverflow.com/a/14178717/71522 in_matrix = [] for (x, y), (X, Y) in zip(src, dst): in_matrix.extend([ [x, y, 1, 0, 0, 0, -X * x, -X * y], [0, 0, 0, x, y, 1, -Y * x, -Y * y], ]) A = np.matrix(in_matrix, dtype=np.float) B = np.array(dst).reshape(8) af = np.dot(np.linalg.inv(A.T * A) * A.T, B) m = np.append(np.array(af).reshape(8), 1).reshape((3, 3)) logging.getLogger(__name__).info("Created transformmatrix: src %s dst %s m %s", src, dst, m) return m # got this amazing thing from here: https://stackoverflow.com/a/24088499 def create_perspective_transform(src, dst, round=False, splat_args=False): """ Returns a function which will transform points in quadrilateral ``src`` to the corresponding points on quadrilateral ``dst``:: >>> transform = create_perspective_transform( ... [(0, 0), (10, 0), (10, 10), (0, 10)], ... [(50, 50), (100, 50), (100, 100), (50, 100)], ... ) >>> transform((5, 5)) (74.99999999999639, 74.999999999999957) If ``round`` is ``True`` then points will be rounded to the nearest integer and integer values will be returned. >>> transform = create_perspective_transform( ... [(0, 0), (10, 0), (10, 10), (0, 10)], ... [(50, 50), (100, 50), (100, 100), (50, 100)], ... round=True, ... ) >>> transform((5, 5)) (75, 75) If ``splat_args`` is ``True`` the function will accept two arguments instead of a tuple. >>> transform = create_perspective_transform( ... [(0, 0), (10, 0), (10, 10), (0, 10)], ... [(50, 50), (100, 50), (100, 100), (50, 100)], ... splat_args=True, ... ) >>> transform(5, 5) (74.99999999999639, 74.999999999999957) If the input values yield an invalid transformation matrix an identity function will be returned and the ``error`` attribute will be set to a description of the error:: >>> tranform = create_perspective_transform( ... np.zeros((4, 2)), ... np.zeros((4, 2)), ... ) >>> transform((5, 5)) (5.0, 5.0) >>> transform.error 'invalid input quads (...): Singular matrix """ try: transform_matrix = create_perspective_transform_matrix(src, dst) error = None except np.linalg.LinAlgError as e: transform_matrix = np.identity(3, dtype=np.float) error = "invalid input quads (%s and %s): %s" %(src, dst, e) error = error.replace("\n", "") to_eval = "def perspective_transform(%s):\n" %( splat_args and "*pt" or "pt", ) to_eval += " res = np.dot(transform_matrix, ((pt[0], ), (pt[1], ), (1, )))\n" to_eval += " res = res / res[2]\n" if round: to_eval += " return (int(round(res[0][0])), int(round(res[1][0])))\n" else: to_eval += " return (res[0][0], res[1][0])\n" locals = { "transform_matrix": transform_matrix, } locals.update(globals()) exec to_eval in locals, locals res = locals["perspective_transform"] res.matrix = transform_matrix res.error = error return res def coordinatesToSrc(coordinates): return np.array([coordinates['tl'], coordinates['tr'],coordinates['bl'], coordinates['br']]) if __name__ == '__main__': parser = argparse.ArgumentParser(description='launch modified OpenFace instance & create heatmap.') parser.add_argument('--of', default="../build/bin/FaceLandmarkVidMulti", help='The modified version of OpenFace\'s FaceLandmarkVidMulti') parser.add_argument('--coordinates', default="coordinates.p", help='Use a specific coordinates.p file') parser.add_argument('--device', type=int, default=0, help='Webcam device nr. to use') args = parser.parse_args() main(openface_exec=args.of, coordinates_filename=args.coordinates, device=args.device)