import numpy as np import os import pickle import logging from PIL import Image, ImageDraw,ImageTk from matplotlib import cm import sys if sys.version_info[0] < 3: import Tkinter as Tk else: import tkinter as Tk import time import argparse import subprocess import json from threading import Thread import termios, fcntl, os class Heatmap: def __init__(self, metricsSize, logger, coordinates_filename): self.coordinates_filename = coordinates_filename self.logger = logger self.metricsSize = metricsSize self.metrics = np.zeros((metricsSize[1], metricsSize[0])) # (y, x) self.screenDrawCorners = np.array([ [0,0], [metricsSize[0]-1,0], [0, metricsSize[1]-1], [metricsSize[0]-1,metricsSize[1]-1] ]) self.loadCoordinates() self.windowRoot = Tk.Tk() self.windowRoot.geometry('800x600+4000+0') self.windowRoot.attributes("-fullscreen", True) imageWindowSize = tuple(metricsSize) # self.windowRoot.geometry('%dx%d+%d+%d' % (imageWindowSize[0],imageWindowSize[1],0,0)) # we go full screen so not needed self.canvas = Tk.Canvas(self.windowRoot,width=imageWindowSize[0],height=imageWindowSize[1],bd=0, highlightthickness=0, relief='ridge') self.canvas.pack(fill="both", expand=True) self.mapSize = [1200,400] self.mapWindowRoot = Tk.Toplevel(master=self.windowRoot) self.mapWindowRoot.geometry('%dx%d' % tuple(self.mapSize)) self.mapCanvas = Tk.Canvas(self.mapWindowRoot,width=self.mapSize[0],height=self.mapSize[1],bd=0, highlightthickness=0, relief='ridge') self.mapCanvas.create_rectangle(20, 50, 22, 350, fill="black", width=0, tags="screenTop") self.mapCanvas.create_rectangle(600, 50, 602, 300, fill="black", width=0, tags="screenSide") # self.mapImage = Image.new('RGB', (500,350)) # self.mapDraw = ImageDraw.Draw(self.mapImage) self.mapCanvas.pack(fill="both", expand=True) self.windowRoot.bind("", self.onKeyPress) self.mapWindowRoot.bind("", self.onKeyPress) # self.updateWindow() self.currentTargets = [{ u'confidence': 0.983333, u'head_rot': [0.270533, -0.0669274, 0.113554], u'gaze_angle': [0.025313, 0.403179], u'fid': 0, u'head_pos': [73.5302, 26.4475, 399.764], 'target': [100,100] }] # Have a pre-created blurred circle. circle = Image.open("circle.png").convert("L") self.circle = np.array(circle.resize((circle.size[0]/2,circle.size[1]/2))).astype(float) self.circle = self.circle/np.max(self.circle) self.circleRadius = float(self.circle.shape[0]-1)/2 # test drawing of metrics: # self.addMetric(5,4,1) # self.addMetric(100,100,1) # self.addMetric(200,200,1) self.addMetric(300,300,2) # self.addMetric(1920,1070,2) self.windowRoot.update() self.windowGeometry = self.windowRoot.geometry().split("+")[0].split("x") self.windowGeometry = (int(self.windowGeometry[0]), int(self.windowGeometry[0])) self.logger.info("Window size: %s", self.windowGeometry) self.updateWindow() def onKeyPress(self, event=None, charachter=None): if event is None and charachter is not None: k = charachter else: k = event.char print("PRESSED %s" % k) if k.isdigit(): print(len(self.currentTargets)) if len(self.currentTargets): c = int(k) if c == 1: self.setCoordinate("tl", self.currentTargets[0]) elif c == 2: self.setCoordinate("tr", self.currentTargets[0]) elif c == 3: self.setCoordinate("bl", self.currentTargets[0]) elif c == 4: self.setCoordinate("br", self.currentTargets[0]) else: if k == "q": exit() def addMetric(self,x,y,confidence): s = self.metrics.shape # y,x startX = int( self.circleRadius - x if x <= self.circleRadius else 0) endX = int( s[1] - x + self.circleRadius if x >= (s[1] - self.circleRadius) else self.circle.shape[0]) startY = int( self.circleRadius - y if y <= self.circleRadius else 0) endY = int( s[0] - y + self.circleRadius if y >= s[0] - self.circleRadius else self.circle.shape[1]) mStartX = int(max(0, x-self.circleRadius)) mEndX = int(min(s[1], x+self.circleRadius+1)) mStartY = int(max(0, y-self.circleRadius)) mEndY = int(min(s[0], y+self.circleRadius+1)) self.logger.debug("Add metric at (%(x)d,%(y)d), circle %(startY)d:%(endY)d,%(startX)d:%(endX)d, matrix %(mStartY)d:%(mEndY)d,%(mStartX)d:%(mEndX)d" % locals()) circlePart = self.circle[startY:endY,startX:endX] * confidence self.metrics[mStartY:mEndY,mStartX:mEndX] += circlePart def keepWindowUpdated(self): while True: self.updateWindow() def updateFromJson(self, frame): t1 = time.time() self.logger.info("Received %s", frame) currentTargets = [] for face in frame: # { # u'confidence': 0.983333, # u'head_rot': [0.270533, -0.0669274, 0.113554], # u'gaze_angle': [0.025313, 0.403179], # u'fid': 0, # u'head_pos': [73.5302, 26.4475, 399.764] # } x, y = self.getTargetOfFace(face) self.logger.debug("Face %d on %s", face['fid'], [x,y]) targetPoint = self.transform(np.array([x,y])) self.logger.info("Looking at %s", targetPoint) targetInt = (int(targetPoint[0]), int(targetPoint[1])) # check if point fits on screen: # if so, measure it face['target'] = [targetInt[0], targetInt[1]] currentTargets.append(face) if targetInt[0] >= 0 and targetInt[1] >= 0 and targetInt[1] < self.metricsSize[1] and targetInt[0] < self.metricsSize[0]: self.addMetric(targetInt[0], targetInt[1], face['confidence']) t2 = time.time() self.logger.debug("Update took %fs", t2-t1) self.currentTargets = currentTargets # self.updateWindow() def updateMap(self): self.mapCanvas.delete("figure") virtualWidth = 300 virtualHeight = 250 mmPerPixel = abs(self.coordinates['tl'][0] - self.coordinates['tr'][0])/float(virtualWidth) for face in self.currentTargets: dx = face['head_pos'][0]/mmPerPixel #left/right dy = face['head_pos'][1]/mmPerPixel # top/down dz = face['head_pos'][2]/mmPerPixel # front/back p1x = int(20 + dz) p1y = int(virtualWidth / 2 + 50 + dx) p2x = int(600 + dz) p2y = int(virtualHeight / 2 + 50 + dy) self.mapCanvas.create_oval(p1x-3, p1y-3, p1x+3, p1y+3, fill="red", width=0, tags="figure") self.mapCanvas.create_oval(p2x-3, p2y-3, p2x+3, p2y+3, fill="red", width=0, tags="figure") p3x = int(20) p3y = int(float(face['target'][0]) / self.metricsSize[0] * virtualWidth + 50) p4x = int(600) p4y = int(float(face['target'][1]) / self.metricsSize[1] * virtualHeight + 50) self.mapCanvas.create_line(p1x,p1y, p3x,p3y, fill="green", tags="figure") self.mapCanvas.create_line(p2x,p2y, p4x,p4y, fill="green", tags="figure") self.logger.debug("DRAW FACE TO", face, (p1x,p1y), "AND", (p2x,p2y)) def updateWindow(self): t1 = time.time() normalisedMetrics = self.metrics / (max(18,np.max(self.metrics))) # convert to colormap, thanks to: https://stackoverflow.com/a/10967471 colormap = cm.plasma colormap = cm.CMRmap normalisedMetrics = np.uint8(colormap(normalisedMetrics)*255) image = Image.fromarray(normalisedMetrics) self.updateMap() # Too clunky for now: # if len(self.currentTargets) > 0: # c = ImageDraw.Draw(image) # for t in self.currentTargets: # c.ellipse((t[0]-20, t[1]-20, t[0]+20, t[1]+20), fill=(100,100,100,100))) # del c # wpercent = (self.windowGeometry[0] / float(image.size[0])) # hsize = int((float(image.size[1]) * float(wpercent))) image = image.resize((self.windowGeometry[0], self.windowGeometry[1])) tkpi = ImageTk.PhotoImage(image) self.canvas.delete("IMG") imagesprite = self.canvas.create_image(self.windowGeometry[0]/2,self.windowGeometry[1]/2 ,image=tkpi, tags="IMG", anchor="center") self.windowRoot.update() t2 = time.time() self.logger.debug("Draw took %fs", t2-t1) def getTargetOfFace(self, face): x = np.arctan(face['gaze_angle'][0])*face['head_pos'][2] + face['head_pos'][0] y = np.arctan(face['gaze_angle'][1])*face['head_pos'][2] + face['head_pos'][1] return (x,y) def loadCoordinates(self): # coordinates of the screen boundaries if os.path.exists(self.coordinates_filename): self.coordinates = pickle.load(open(self.coordinates_filename, "rb")) else: self.coordinates = {'tl': None, 'tr': None, 'bl': None, 'br': None} self.updateTransform() def saveCoordinates(self): self.logger.debug(self.coordinates.values()) pickle.dump(self.coordinates, open( self.coordinates_filename, "wb" ) ) self.logger.info("Saved coordinates to %s", self.coordinates_filename) def updateTransform(self): if self.hasAllCoordinates() : self.transform = create_perspective_transform(coordinatesToSrc(self.coordinates), self.screenDrawCorners) else: self.transform = None self.logger.debug("Corners: %s", self.screenDrawCorners) def hasAllCoordinates(self): return not any (x is None for x in self.coordinates.values()) def setCoordinate(self, pos, face): self.coordinates[pos] = self.getTargetOfFace(face) print(self.hasAllCoordinates(), self.coordinates.values()) if self.hasAllCoordinates(): self.logger.warning("Go create new transform") self.saveCoordinates() self.updateTransform() def runCam(self, openface_exec, device): for output in execute([openface_exec, "-device", str(device), "-cam_width", "1280", "-cam_height", "720"]): try: frame = json.loads(output) self.updateFromJson(frame) try: c = sys.stdin.read(1) self.onKeyPress(charachter=c) except IOError: pass except Exception as e: self.logger.warning(str(e)) self.logger.warning("received %s", output) def main(openface_exec, coordinates_filename, device=0): logging.basicConfig( format='%(asctime)-15s %(name)s %(levelname)s: %(message)s' ) logger = logging.getLogger(__name__) logger.setLevel(logging.WARNING) fd = sys.stdin.fileno() oldterm = termios.tcgetattr(fd) newattr = termios.tcgetattr(fd) newattr[3] = newattr[3] & ~termios.ICANON & ~termios.ECHO termios.tcsetattr(fd, termios.TCSANOW, newattr) oldflags = fcntl.fcntl(fd, fcntl.F_GETFL) fcntl.fcntl(fd, fcntl.F_SETFL, oldflags | os.O_NONBLOCK) try: # metrics matrix metricsSize = [1920,1080] metricsSize = [1920/2,1080/2] metricsSize = [800,600] heatmap = Heatmap(metricsSize, logger, coordinates_filename) # heatmap.runCam(openface_exec, device) thread = Thread(target=heatmap.runCam, args=(openface_exec, device)) thread.start() heatmap.keepWindowUpdated() finally: termios.tcsetattr(fd, termios.TCSAFLUSH, oldterm) fcntl.fcntl(fd, fcntl.F_SETFL, oldflags) # thanks to https://stackoverflow.com/a/4417735 def execute(cmd): popen = subprocess.Popen(cmd, stdout=subprocess.PIPE, universal_newlines=True) for stdout_line in iter(popen.stdout.readline, ""): yield stdout_line popen.stdout.close() return_code = popen.wait() if return_code: raise subprocess.CalledProcessError(return_code, cmd) def create_perspective_transform_matrix(src, dst): """ Creates a perspective transformation matrix which transforms points in quadrilateral ``src`` to the corresponding points on quadrilateral ``dst``. Will raise a ``np.linalg.LinAlgError`` on invalid input. """ # See: # * http://xenia.media.mit.edu/~cwren/interpolator/ # * http://stackoverflow.com/a/14178717/71522 in_matrix = [] for (x, y), (X, Y) in zip(src, dst): in_matrix.extend([ [x, y, 1, 0, 0, 0, -X * x, -X * y], [0, 0, 0, x, y, 1, -Y * x, -Y * y], ]) A = np.matrix(in_matrix, dtype=np.float) B = np.array(dst).reshape(8) af = np.dot(np.linalg.inv(A.T * A) * A.T, B) m = np.append(np.array(af).reshape(8), 1).reshape((3, 3)) logging.getLogger(__name__).info("Created transformmatrix: src %s dst %s m %s", src, dst, m) return m # got this amazing thing from here: https://stackoverflow.com/a/24088499 def create_perspective_transform(src, dst, round=False, splat_args=False): """ Returns a function which will transform points in quadrilateral ``src`` to the corresponding points on quadrilateral ``dst``:: >>> transform = create_perspective_transform( ... [(0, 0), (10, 0), (10, 10), (0, 10)], ... [(50, 50), (100, 50), (100, 100), (50, 100)], ... ) >>> transform((5, 5)) (74.99999999999639, 74.999999999999957) If ``round`` is ``True`` then points will be rounded to the nearest integer and integer values will be returned. >>> transform = create_perspective_transform( ... [(0, 0), (10, 0), (10, 10), (0, 10)], ... [(50, 50), (100, 50), (100, 100), (50, 100)], ... round=True, ... ) >>> transform((5, 5)) (75, 75) If ``splat_args`` is ``True`` the function will accept two arguments instead of a tuple. >>> transform = create_perspective_transform( ... [(0, 0), (10, 0), (10, 10), (0, 10)], ... [(50, 50), (100, 50), (100, 100), (50, 100)], ... splat_args=True, ... ) >>> transform(5, 5) (74.99999999999639, 74.999999999999957) If the input values yield an invalid transformation matrix an identity function will be returned and the ``error`` attribute will be set to a description of the error:: >>> tranform = create_perspective_transform( ... np.zeros((4, 2)), ... np.zeros((4, 2)), ... ) >>> transform((5, 5)) (5.0, 5.0) >>> transform.error 'invalid input quads (...): Singular matrix """ try: transform_matrix = create_perspective_transform_matrix(src, dst) error = None except np.linalg.LinAlgError as e: transform_matrix = np.identity(3, dtype=np.float) error = "invalid input quads (%s and %s): %s" %(src, dst, e) error = error.replace("\n", "") to_eval = "def perspective_transform(%s):\n" %( splat_args and "*pt" or "pt", ) to_eval += " res = np.dot(transform_matrix, ((pt[0], ), (pt[1], ), (1, )))\n" to_eval += " res = res / res[2]\n" if round: to_eval += " return (int(round(res[0][0])), int(round(res[1][0])))\n" else: to_eval += " return (res[0][0], res[1][0])\n" locals = { "transform_matrix": transform_matrix, } locals.update(globals()) exec to_eval in locals, locals res = locals["perspective_transform"] res.matrix = transform_matrix res.error = error return res def coordinatesToSrc(coordinates): return np.array([coordinates['tl'], coordinates['tr'],coordinates['bl'], coordinates['br']]) if __name__ == '__main__': parser = argparse.ArgumentParser(description='launch modified OpenFace instance & create heatmap.') parser.add_argument('--of', default="../build/bin/FaceLandmarkVidMulti", help='The modified version of OpenFace\'s FaceLandmarkVidMulti') parser.add_argument('--coordinates', default="coordinates.p", help='Use a specific coordinates.p file') parser.add_argument('--device', type=int, default=0, help='Webcam device nr. to use') args = parser.parse_args() main(openface_exec=args.of, coordinates_filename=args.coordinates, device=args.device)