import numpy as np import os import pickle import logging from PIL import Image, ImageDraw,ImageTk from matplotlib import cm import sys if sys.version_info[0] < 3: import Tkinter as Tk else: import tkinter as Tk import time import argparse import subprocess import json from threading import Thread import termios, fcntl, os import glob import os class Heatmap: def __init__(self, metricsSize, logger, coordinates_filename, tag): self.coordinates_filename = coordinates_filename self.logger = logger self.metricsSize = metricsSize self.tag = tag self.metricSaveDiff = 60*5 # save metrics every 5 minutes self.loadMetrics() self.screenDrawCorners = np.array([ [0,0], [metricsSize[0]-1,0], [0, metricsSize[1]-1], [metricsSize[0]-1,metricsSize[1]-1] ]) self.loadCoordinates() self.windowRoot = Tk.Tk() self.windowRoot.geometry('800x600+4000+0') self.windowRoot.title('Heatmap') self.windowRoot.attributes("-fullscreen", True) imageWindowSize = tuple(metricsSize) # self.windowRoot.geometry('%dx%d+%d+%d' % (imageWindowSize[0],imageWindowSize[1],0,0)) # we go full screen so not needed self.canvas = Tk.Canvas(self.windowRoot,width=imageWindowSize[0],height=imageWindowSize[1],bd=0, highlightthickness=0, relief='ridge') self.canvas.pack(fill="both", expand=True) self.mapSize = [1600,600] self.mapScreenWidth = 300 self.mapScreenHeight = 250 self.mapWindowRoot = Tk.Toplevel(master=self.windowRoot) # self.mapWindowRoot.configure(background="black") self.mapWindowRoot.geometry('%dx%d+0+800' % tuple(self.mapSize)) self.mapWindowRoot.title("Tracking Overview") self.mapCanvas = Tk.Canvas(self.mapWindowRoot,width=self.mapSize[0],height=self.mapSize[1],bd=0, highlightthickness=0, relief='ridge') self.mapCanvas.create_rectangle(20, self.mapSize[1]/2 - self.mapScreenWidth /2, 22, self.mapSize[1]/2 + self.mapScreenWidth /2, fill="white", width=0, tags="screenTop") self.mapCanvas.create_rectangle(600, self.mapSize[1]/2 - self.mapScreenHeight /2, 602, self.mapSize[1]/2 + self.mapScreenHeight /2, fill="white", width=0, tags="screenSide") self.mapCanvas.configure(background='black') # self.mapImage = Image.new('RGB', (500,350)) # self.mapDraw = ImageDraw.Draw(self.mapImage) self.mapCanvas.pack(fill="both", expand=True) self.windowRoot.bind("", self.onKeyPress) self.mapWindowRoot.bind("", self.onKeyPress) # self.updateWindow() self.currentTargets = [{ u'confidence': 0.983333, u'head_rot': [0.270533, -0.0669274, 0.113554], u'gaze_angle': [0.025313, 0.403179], u'fid': 0, u'head_pos': [73.5302, 26.4475, 399.764], 'target': [100,100] }] # Have a pre-created blurred circle. circle = Image.open("circle.png").convert("L") self.circle = np.array(circle.resize((circle.size[0]/2,circle.size[1]/2))).astype(float) self.circle = self.circle/np.max(self.circle) self.circleRadius = float(self.circle.shape[0]-1)/2 # test drawing of metrics: # self.addMetric(5,4,1) # self.addMetric(100,100,1) # self.addMetric(200,200,1) self.addMetric(300,300,2) # self.addMetric(1920,1070,2) self.windowRoot.update() self.windowGeometry = self.windowRoot.geometry().split("+")[0].split("x") self.windowGeometry = (int(self.windowGeometry[0]), int(self.windowGeometry[0])) self.logger.info("Window size: %s", self.windowGeometry) self.updateWindow() def getOutputPrefix(self): return "output/metrics-%s-%dx%d-" % (self.tag, self.metricsSize[0], self.metricsSize[1]) def loadMetrics(self): list_of_files = glob.glob('%s*.p' % self.getOutputPrefix()) if len(list_of_files) < 1: self.metrics = np.zeros((self.metricsSize[1], self.metricsSize[0])) # (y, x) else: latest_file = max(list_of_files, key=os.path.getctime) with open(latest_file, "rb") as f: self.metrics = pickle.load(f) self.lastMetricSave = time.time() def saveMetrics(self): name = "{}{}.p".format(self.getOutputPrefix(), time.strftime("%Y-%m-%d %H:%M:%S")) with open(name, "wb") as fp: pickle.dump(self.metrics, fp) self.lastMetricSave = time.time() def onKeyPress(self, event=None, charachter=None): if event is None and charachter is not None: k = charachter else: k = event.char print("PRESSED %s" % k) if k.isdigit(): print(len(self.currentTargets)) if len(self.currentTargets): c = int(k) if c == 1: self.setCoordinate("tl", self.currentTargets[0]) elif c == 2: self.setCoordinate("tr", self.currentTargets[0]) elif c == 3: self.setCoordinate("bl", self.currentTargets[0]) elif c == 4: self.setCoordinate("br", self.currentTargets[0]) else: if k == "q": exit() def addMetric(self,x,y,confidence): s = self.metrics.shape # y,x startX = int( self.circleRadius - x if x <= self.circleRadius else 0) endX = int( s[1] - x + self.circleRadius if x >= (s[1] - self.circleRadius) else self.circle.shape[0]) startY = int( self.circleRadius - y if y <= self.circleRadius else 0) endY = int( s[0] - y + self.circleRadius if y >= s[0] - self.circleRadius else self.circle.shape[1]) mStartX = int(max(0, x-self.circleRadius)) mEndX = int(min(s[1], x+self.circleRadius+1)) mStartY = int(max(0, y-self.circleRadius)) mEndY = int(min(s[0], y+self.circleRadius+1)) self.logger.debug("Add metric at (%(x)d,%(y)d), circle %(startY)d:%(endY)d,%(startX)d:%(endX)d, matrix %(mStartY)d:%(mEndY)d,%(mStartX)d:%(mEndX)d" % locals()) circlePart = self.circle[startY:endY,startX:endX] * confidence self.metrics[mStartY:mEndY,mStartX:mEndX] += circlePart def keepWindowUpdated(self): while True: self.updateWindow() if time.time() - self.lastMetricSave > self.metricSaveDiff: self.saveMetrics() def updateFromJson(self, frame): t1 = time.time() self.logger.info("Received %s", frame) currentTargets = [] for face in frame: # { # u'confidence': 0.983333, # u'head_rot': [0.270533, -0.0669274, 0.113554], # u'gaze_angle': [0.025313, 0.403179], # u'fid': 0, # u'head_pos': [73.5302, 26.4475, 399.764] # } x, y = self.getTargetOfFace(face) self.logger.debug("Face %d on %s", face['fid'], [x,y]) targetPoint = self.transform(np.array([x,y])) self.logger.info("Looking at %s", targetPoint) targetInt = (int(targetPoint[0]), int(targetPoint[1])) # check if point fits on screen: # if so, measure it face['target'] = [targetInt[0], targetInt[1]] currentTargets.append(face) if targetInt[0] >= 0 and targetInt[1] >= 0 and targetInt[1] < self.metricsSize[1] and targetInt[0] < self.metricsSize[0]: self.addMetric(targetInt[0], targetInt[1], face['confidence']) t2 = time.time() self.logger.debug("Update took %fs", t2-t1) self.currentTargets = currentTargets # self.updateWindow() def updateMap(self): self.mapCanvas.delete("figure") mmPerPixel = abs(self.coordinates['tl'][0] - self.coordinates['tr'][0])/float(self.mapScreenWidth) for face in self.currentTargets: dx = face['head_pos'][0]/mmPerPixel #left/right dy = face['head_pos'][1]/mmPerPixel # top/down dz = face['head_pos'][2]/mmPerPixel # front/back p1x = int(20 + dz) p1y = int(self.mapSize[1]/2 - dx) p2x = int(600 + dz) p2y = int(self.mapSize[1]/2 + dy) self.mapCanvas.create_oval(p1x-3, p1y-3, p1x+3, p1y+3, fill="red", width=0, tags="figure") self.mapCanvas.create_oval(p2x-3, p2y-3, p2x+3, p2y+3, fill="red", width=0, tags="figure") p3x = int(20) p3y = int(self.mapSize[1]/2 + self.mapScreenWidth/2 - (float(face['target'][0]) / self.metricsSize[0] * self.mapScreenWidth)) p4x = int(600) p4y = int(float(face['target'][1]) / self.metricsSize[1] * self.mapScreenHeight + self.mapSize[1]/2 - self.mapScreenHeight/2) self.mapCanvas.create_line(p1x,p1y, p3x,p3y, width=2, fill="#ff0", tags="figure") self.mapCanvas.create_line(p2x,p2y, p4x,p4y, width=2, fill="#ff0", tags="figure") self.logger.debug("DRAW FACE TO", face, (p1x,p1y), "AND", (p2x,p2y)) def updateTable(self): # clear terminal os.system('cls' if os.name == 'nt' else 'clear') print("{} gazes".format(len(self.currentTargets))) print("{:>5} {:>10} {:>10} {:>10} {:>5} {:>5}".format(" ", "x", "y", "z", "angle", "angle", )) for face in self.currentTargets: print(u"{:>5} {:>10} mm {:>10} mm {:>10} mm {:>5.2f}\xb0 {:>5.2f}\xb0".format(face['fid'], face['head_pos'][0], face['head_pos'][1], face['head_pos'][2], face['gaze_angle'][0], face['gaze_angle'][1])) def updateWindow(self): t1 = time.time() normalisedMetrics = self.metrics / (max(18,np.max(self.metrics))) # convert to colormap, thanks to: https://stackoverflow.com/a/10967471 colormap = cm.plasma colormap = cm.CMRmap colormap = cm.nipy_spectral normalisedMetrics = np.uint8(colormap(normalisedMetrics)*255) image = Image.fromarray(normalisedMetrics) self.updateMap() self.updateTable() # Too clunky for now: # if len(self.currentTargets) > 0: # c = ImageDraw.Draw(image) # for t in self.currentTargets: # c.ellipse((t[0]-20, t[1]-20, t[0]+20, t[1]+20), fill=(100,100,100,100))) # del c # wpercent = (self.windowGeometry[0] / float(image.size[0])) # hsize = int((float(image.size[1]) * float(wpercent))) image = image.resize((self.windowGeometry[0], self.windowGeometry[1])) tkpi = ImageTk.PhotoImage(image) self.canvas.delete("IMG") imagesprite = self.canvas.create_image(self.windowGeometry[0]/2,self.windowGeometry[1]/2 ,image=tkpi, tags="IMG", anchor="center") self.windowRoot.update() t2 = time.time() self.logger.debug("Draw took %fs", t2-t1) def getTargetOfFace(self, face): x = np.arctan(face['gaze_angle'][0])*face['head_pos'][2] + face['head_pos'][0] y = np.arctan(face['gaze_angle'][1])*face['head_pos'][2] + face['head_pos'][1] return (x,y) def loadCoordinates(self): # coordinates of the screen boundaries if os.path.exists(self.coordinates_filename): self.coordinates = pickle.load(open(self.coordinates_filename, "rb")) else: self.coordinates = {'tl': None, 'tr': None, 'bl': None, 'br': None} self.updateTransform() def saveCoordinates(self): self.logger.debug(self.coordinates.values()) pickle.dump(self.coordinates, open( self.coordinates_filename, "wb" ) ) self.logger.info("Saved coordinates to %s", self.coordinates_filename) def updateTransform(self): if self.hasAllCoordinates() : self.transform = create_perspective_transform(coordinatesToSrc(self.coordinates), self.screenDrawCorners) else: self.transform = None self.logger.debug("Corners: %s", self.screenDrawCorners) def hasAllCoordinates(self): return not any (x is None for x in self.coordinates.values()) def setCoordinate(self, pos, face): self.coordinates[pos] = self.getTargetOfFace(face) print(self.hasAllCoordinates(), self.coordinates.values()) if self.hasAllCoordinates(): self.logger.warning("Go create new transform") self.saveCoordinates() self.updateTransform() def runCam(self, openface_exec, device): for output in execute([openface_exec, "-device", str(device), "-cam_width", "1280", "-cam_height", "720"]): try: frame = json.loads(output) self.updateFromJson(frame) try: c = sys.stdin.read(1) self.onKeyPress(charachter=c) except IOError: pass except Exception as e: self.logger.warning(str(e)) self.logger.warning("received %s", output) def main(openface_exec, coordinates_filename, tag, device=0): logging.basicConfig( format='%(asctime)-15s %(name)s %(levelname)s: %(message)s' ) logger = logging.getLogger(__name__) logger.setLevel(logging.WARNING) fd = sys.stdin.fileno() oldterm = termios.tcgetattr(fd) newattr = termios.tcgetattr(fd) newattr[3] = newattr[3] & ~termios.ICANON & ~termios.ECHO termios.tcsetattr(fd, termios.TCSANOW, newattr) oldflags = fcntl.fcntl(fd, fcntl.F_GETFL) fcntl.fcntl(fd, fcntl.F_SETFL, oldflags | os.O_NONBLOCK) try: # metrics matrix metricsSize = [1920,1080] metricsSize = [1440,900] # metricsSize = [1920/2,1080/2] # metricsSize = [800,600] heatmap = Heatmap(metricsSize, logger, coordinates_filename, tag) # heatmap.runCam(openface_exec, device) thread = Thread(target=heatmap.runCam, args=(openface_exec, device)) thread.start() heatmap.keepWindowUpdated() finally: termios.tcsetattr(fd, termios.TCSAFLUSH, oldterm) fcntl.fcntl(fd, fcntl.F_SETFL, oldflags) # thanks to https://stackoverflow.com/a/4417735 def execute(cmd): popen = subprocess.Popen(cmd, stdout=subprocess.PIPE, universal_newlines=True) for stdout_line in iter(popen.stdout.readline, ""): yield stdout_line popen.stdout.close() return_code = popen.wait() if return_code: raise subprocess.CalledProcessError(return_code, cmd) def create_perspective_transform_matrix(src, dst): """ Creates a perspective transformation matrix which transforms points in quadrilateral ``src`` to the corresponding points on quadrilateral ``dst``. Will raise a ``np.linalg.LinAlgError`` on invalid input. """ # See: # * http://xenia.media.mit.edu/~cwren/interpolator/ # * http://stackoverflow.com/a/14178717/71522 in_matrix = [] for (x, y), (X, Y) in zip(src, dst): in_matrix.extend([ [x, y, 1, 0, 0, 0, -X * x, -X * y], [0, 0, 0, x, y, 1, -Y * x, -Y * y], ]) A = np.matrix(in_matrix, dtype=np.float) B = np.array(dst).reshape(8) af = np.dot(np.linalg.inv(A.T * A) * A.T, B) m = np.append(np.array(af).reshape(8), 1).reshape((3, 3)) logging.getLogger(__name__).info("Created transformmatrix: src %s dst %s m %s", src, dst, m) return m # got this amazing thing from here: https://stackoverflow.com/a/24088499 def create_perspective_transform(src, dst, round=False, splat_args=False): """ Returns a function which will transform points in quadrilateral ``src`` to the corresponding points on quadrilateral ``dst``:: >>> transform = create_perspective_transform( ... [(0, 0), (10, 0), (10, 10), (0, 10)], ... [(50, 50), (100, 50), (100, 100), (50, 100)], ... ) >>> transform((5, 5)) (74.99999999999639, 74.999999999999957) If ``round`` is ``True`` then points will be rounded to the nearest integer and integer values will be returned. >>> transform = create_perspective_transform( ... [(0, 0), (10, 0), (10, 10), (0, 10)], ... [(50, 50), (100, 50), (100, 100), (50, 100)], ... round=True, ... ) >>> transform((5, 5)) (75, 75) If ``splat_args`` is ``True`` the function will accept two arguments instead of a tuple. >>> transform = create_perspective_transform( ... [(0, 0), (10, 0), (10, 10), (0, 10)], ... [(50, 50), (100, 50), (100, 100), (50, 100)], ... splat_args=True, ... ) >>> transform(5, 5) (74.99999999999639, 74.999999999999957) If the input values yield an invalid transformation matrix an identity function will be returned and the ``error`` attribute will be set to a description of the error:: >>> tranform = create_perspective_transform( ... np.zeros((4, 2)), ... np.zeros((4, 2)), ... ) >>> transform((5, 5)) (5.0, 5.0) >>> transform.error 'invalid input quads (...): Singular matrix """ try: transform_matrix = create_perspective_transform_matrix(src, dst) error = None except np.linalg.LinAlgError as e: transform_matrix = np.identity(3, dtype=np.float) error = "invalid input quads (%s and %s): %s" %(src, dst, e) error = error.replace("\n", "") to_eval = "def perspective_transform(%s):\n" %( splat_args and "*pt" or "pt", ) to_eval += " res = np.dot(transform_matrix, ((pt[0], ), (pt[1], ), (1, )))\n" to_eval += " res = res / res[2]\n" if round: to_eval += " return (int(round(res[0][0])), int(round(res[1][0])))\n" else: to_eval += " return (res[0][0], res[1][0])\n" locals = { "transform_matrix": transform_matrix, } locals.update(globals()) exec to_eval in locals, locals res = locals["perspective_transform"] res.matrix = transform_matrix res.error = error return res def coordinatesToSrc(coordinates): return np.array([coordinates['tl'], coordinates['tr'],coordinates['bl'], coordinates['br']]) if __name__ == '__main__': parser = argparse.ArgumentParser(description='launch modified OpenFace instance & create heatmap.') parser.add_argument('--of', default="../build/bin/FaceLandmarkVidMulti", help='The modified version of OpenFace\'s FaceLandmarkVidMulti') parser.add_argument('--coordinates', default="coordinates.p", help='Use a specific coordinates.p file') parser.add_argument('--device', type=int, default=0, help='Webcam device nr. to use') parser.add_argument('--tag', type=str, default="default", help='Heatmap instance. Determines the name of saved (& thus loaded) files') args = parser.parse_args() main(openface_exec=args.of, coordinates_filename=args.coordinates, device=args.device, tag=args.tag)