import os from PIL import Image, ImageDraw import argparse import json import time import glob import numpy as np parser = argparse.ArgumentParser(description='Parses opencv-webcam-demo json output files and collects statistics') parser.add_argument('--frameOutput', '-o', required=True, help='directory to look for frames & json') parser.add_argument('--status', '-s', action='store_true', help='Keep status of last frame') parser.add_argument('--cutAllFaces', action='store_true', help='Cut out all faces from all frames') parser.add_argument('--sum', action='store_true', help='Get total scores over all time') parser.add_argument('--disonant', action='store_true', help='Get most disonant faces over time') args = parser.parse_args() faces = [] class Face: def __init__(self, frame, data): self.id = data['id'] self.frame = frame # Frame class self.data = data # json data self.disonanceScore = None # a first attempt, can be deprecated? self.anomalyScore = None def getFaceImg(self): r = self.data['rect'] return self.frame.getImg().crop((int(r['x']), int(r['y']), int(r['x']+r['w']), int(r['y']+r['h']))) def getCharacteristicVector(self): self.vector = np.array([ self.data["smile"], self.data["innerBrowRaise"], self.data["browRaise"], self.data["browFurrow"], self.data["noseWrinkle"], self.data["upperLipRaise"], self.data["lipCornerDepressor"], self.data["chinRaise"], self.data["lipPucker"], self.data["lipPress"], self.data["lipSuck"], self.data["mouthOpen"], self.data["smirk"], self.data["eyeClosure"], # self.data["attention"], self.data["eyeWiden"], self.data["cheekRaise"], self.data["lidTighten"], self.data["dimpler"], self.data["lipStretch"], self.data["jawDrop"], ]) return self.vector def setAnomalyScore(self, score): self.anomalyScore = score class Frame: """ Everything for an analysed frame """ def __init__(self, outputPath, nr): self.outputPath = outputPath self.nr = nr self.name = "frame%06d" % nr self.jsonPath = os.path.join(outputPath, ("frame%06d" % (nr)) + ".json") self.imgPath = os.path.join(outputPath, self.name + ".jpg") self.faces = None # init with getFaces def getTime(self): return os.path.getmtime(self.imgPath) def getJson(self): #~ try: with open(self.jsonPath) as fp: return json.load(fp) #~ except Exception as e: #~ # no json file yet? #~ return None def getImg(self): return Image.open(self.imgPath) def getFaces(self): if self.faces is None: j = self.getJson() self.faces = [Face(self, f) for f in j['faces']] faces.extend(self.faces) return self.faces def updateDisonanceScores(self): totalValence = 0.0 totalFaces = 0 for face in self.getFaces(): totalValence += face.data['valence'] totalFaces += 1 if totalFaces == 0: return avgValence = totalValence / totalFaces for face in self.getFaces(): face.disonanceScore = abs(face.data['valence'] - avgValence) def getAverageV(self): vectors = [face.getCharacteristicVector() for face in self.getFaces()] vAvg = np.mean(vectors, axis=0) return vAvg def updateAnomalyScores(self): vAvg = self.getAverageV() for face in self.getFaces(): face.setAnomalyScore(np.linalg.norm(face.getCharacteristicVector() - vAvg)) def exists(self): return os.path.exists(self.jsonPath) and os.path.exists(self.imgPath) frames = {} def loadFrames(frameDir): global frames nr = 2 nextFrame = Frame(frameDir, nr) # TODO; make threaded and infinite loop that updates global frames while nextFrame.exists(): frames[nr] = nextFrame nr+=1 nextFrame = Frame(frameDir, nr) return frames def getLastFrame(frameDir): jsons = sorted(glob.glob(os.path.join(frameDir, "*.json"))) if len(jsons): lastJson = jsons[-1] lastNr = int(lastJson[-11:-5]) frame = Frame(frameDir, lastNr) return frame return None def cutOutFaces(frame, targetDir): for faceNr, face in enumerate(frame.getFaces()): print(faceNr, face) img = face.getFaceImg() faceImgPath = os.path.join(targetDir, frame.name + "-%s.jpg" % face.id) print(faceImgPath) img.save(faceImgPath) pass def validateJsonTimes(): lastTime = None for frameNr, frame in loadFrames(args.frameOutput).items(): thisTime = frame.getJson()['t'] #print(frameNr, thisTime) if not (lastTime is None) and lastTime > thisTime: print "ERRROR!! Time error at %s. Restarted scanner there?" % frameNr lastTime = thisTime def sumEmotions(): total = 0. summed = 0. items = 0 for frameNr, frame in loadFrames(args.frameOutput).items(): for face in frame.getFaces(): total += abs(face.data['valence']) summed += face.data['valence'] items += 1 average = summed / items print ("Total emotion %d, positivity score %d (average: %s)" % (total, summed, average)) def getMostDisonant(nr = 5): for frameNr, frame in loadFrames(args.frameOutput).items(): frame.updateDisonanceScores() faces.sort(key=lambda x: x.disonanceScore, reverse=True) mostDisonantFaces = faces[:nr] for face in mostDisonantFaces: print("Frame %d, face %d, score %d, valence %d" % (face.frame.nr, face.id, face.disonanceScore, face.data['valence'])) face.getFaceImg().show() def getAnomalies(nr = 5): for frameNr, frame in loadFrames(args.frameOutput).items(): frame.updateAnomalyScores() faces.sort(key=lambda x: x.anomalyScore, reverse=True) anomalies = faces[:nr] for face in anomalies: print("Frame %d, face %d, score %d" % (face.frame.nr, face.id, face.anomalyScore)) #~ getCharacteristicVector face.getFaceImg().show() def printFrameStats(frame): os.system('clear') print(time.time()) print( ("Nr: %d" % frame.nr).ljust(40) + ("t: %f" % frame.getJson()['t']) ) #~ print faces = frame.getFaces() print("Faces: %d" % len(faces)) if len(faces) < 1: return params = ['smile', 'browFurrow'] q0s = [np.percentile(np.array([f.data[param] for f in faces]),0) for param in params] q1s = [np.percentile(np.array([f.data[param] for f in faces]),25) for param in params] q2s = [np.percentile(np.array([f.data[param] for f in faces]),50) for param in params] q3s = [np.percentile(np.array([f.data[param] for f in faces]),75) for param in params] q4s = [np.percentile(np.array([f.data[param] for f in faces]),100) for param in params] print " ".ljust(8), for p in params: print p.center(20), print "" print(" 0% " + "".join([("%f%%" % q).rjust(20) for q in q0s])) print(" q1 " + "".join([("%f%%" % q).rjust(20) for q in q1s])) print(" median " + "".join([("%f%%" % q).rjust(20) for q in q2s])) print(" q3 " + "".join([("%f%%" % q).rjust(20) for q in q3s])) print(" 100% " + "".join([("%f%%" % q).rjust(20) for q in q4s])) #~ TODO: speaker stats frame.updateDisonanceScores() dissonantFace = max(faces,key=lambda f: f.disonanceScore) #~ dissonantFace.getFaceImg() def monitorStatus(frameDir): while True: frame = getLastFrame(frameDir) if not frame is None: printFrameStats(frame) # don't check too often time.sleep(.5) validateJsonTimes() if args.sum: sumEmotions() if args.disonant: getMostDisonant() if args.cutAllFaces: faceDir = os.path.join(args.frameOutput, 'faces') if not os.path.exists(faceDir): os.mkdir(faceDir) for frameNr, frame in loadFrames(args.frameOutput).items(): cutOutFaces(faceDir) if args.status: monitorStatus(args.frameOutput)