moodmeter/parse_output.py

391 lines
11 KiB
Python

import os
from PIL import Image, ImageDraw,ImageTk
import argparse
import json
import time
import glob
import numpy as np
import Tkinter
facialParameters = [
"smile",
"innerBrowRaise",
"browRaise",
"browFurrow",
"noseWrinkle",
"upperLipRaise",
"lipCornerDepressor",
"chinRaise",
"lipPucker",
"lipPress",
"lipSuck",
"mouthOpen",
"smirk",
#~ "attention",
"eyeClosure",
"eyeWiden",
"cheekRaise",
"lidTighten",
"dimpler",
"lipStretch",
"jawDrop",
]
parser = argparse.ArgumentParser(description='Parses opencv-webcam-demo json output files and collects statistics')
parser.add_argument('--frameOutput', '-o', required=True, help='directory to look for frames & json')
parser.add_argument('--status', action='store_true', help='Keep status of last frame')
parser.add_argument('--cutAllFaces', action='store_true', help='Cut out all faces from all frames')
parser.add_argument('--sum', action='store_true', help='Get total scores over all time')
parser.add_argument('--unique', action='store_true', help='Get most unique window')
parser.add_argument('--avg', action='store_true', help='Get most average window')
parser.add_argument('--disonant', action='store_true', help='Get most disonant faces over time')
parser.add_argument('--window-size', '-s', type=int, default=10, help='The nr of frames to group in one sliding window for analysis')
parser.add_argument("--params", "-p", type=str, nargs='+', default=facialParameters, choices=facialParameters, help="The parameters used to calculate the statistics")
args = parser.parse_args()
faces = []
class Face:
def __init__(self, frame, data):
self.id = data['id']
self.frame = frame # Frame class
self.data = data # json data
self.disonanceScore = None # a first attempt, can be deprecated?
self.anomalyScore = None
def getFaceImg(self):
r = self.data['rect']
return self.frame.getImg().crop((int(r['x']), int(r['y']), int(r['x']+r['w']), int(r['y']+r['h'])))
def getCharacteristicVector(self, params):
self.vector = [self.data[p] for p in params]
return self.vector
def setAnomalyScore(self, score):
self.anomalyScore = score
class Frame:
"""
Everything for an analysed frame
"""
def __init__(self, outputPath, nr):
self.outputPath = outputPath
self.nr = nr
self.name = "frame%06d" % nr
self.jsonPath = os.path.join(outputPath, ("frame%06d" % (nr)) + ".json")
self.imgPath = os.path.join(outputPath, self.name + ".jpg")
self.faces = None # init with getFaces
def getTime(self):
return os.path.getmtime(self.imgPath)
def getJson(self):
#~ try:
with open(self.jsonPath) as fp:
return json.load(fp)
#~ except Exception as e:
#~ # no json file yet?
#~ return None
def getImg(self, markFaces = True):
img = Image.open(self.imgPath)
if not markFaces:
return img
draw = ImageDraw.Draw(img)
for f in self.faces:
xy1 = (int(f.data['rect']['x']), int(f.data['rect']['y']))
xy2 = (int(f.data['rect']['x'] + f.data['rect']['w']), int(f.data['rect']['y'] + f.data['rect']['h']))
draw.rectangle([xy1, xy2], outline="#ff0000")
return img
def getFaces(self):
if self.faces is None:
j = self.getJson()
self.faces = [Face(self, f) for f in j['faces']]
faces.extend(self.faces)
return self.faces
def updateDisonanceScores(self):
totalValence = 0.0
totalFaces = 0
for face in self.getFaces():
totalValence += face.data['valence']
totalFaces += 1
if totalFaces == 0:
return
avgValence = totalValence / totalFaces
for face in self.getFaces():
face.disonanceScore = abs(face.data['valence'] - avgValence)
def getAverageV(self, params):
vectors = [face.getCharacteristicVector(params) for face in self.getFaces()]
vAvg = np.mean(vectors, axis=0)
return vAvg
def updateAnomalyScores(self, params):
vAvg = self.getAverageV(params)
for face in self.getFaces():
face.setAnomalyScore(np.linalg.norm(face.getCharacteristicVector() - vAvg))
def exists(self):
return os.path.exists(self.jsonPath) and os.path.exists(self.imgPath)
frames = {}
class Window:
def __init__(self, frameSubset):
"""
Init a sliding window for given Frame-s
"""
self.frames = frameSubset
self.deviation = None
self.standardDeviation = None
def getFaces(self):
faces = []
for frame in self.frames:
faces.extend(frame.getFaces())
return faces
def getStdDev(self, params):
"""
Get standard deviation of the faces within the window for given params
"""
vectors = [f.getCharacteristicVector(params) for f in self.getFaces()]
return np.std(vectors)
def getAverageV(self, params):
vectors = [f.getCharacteristicVector(params) for f in self.getFaces()]
vAvg = np.mean(vectors, axis=0)
return vAvg
@staticmethod
def createWindows(windowSize, frames):
"""
Give a full list of frames and tunrn it into a collection of sliding windows
"""
frames = sorted(frames.items(), key=lambda f: f[0])
frames = [f[1] for f in frames]
windows = []
windowCount = len(frames) - windowSize + 1
if windowCount < 1:
raise Exception("Not enough frames ({}) for a window of size {}".format(len(frames), windowSize))
for offset in range(0, windowCount):
frameSubset = frames[offset:offset+windowSize]
windows.append(Window(frameSubset))
return windows
class WindowCollection:
def __init__(self, windowSize, frames):
self.windows = Window.createWindows(windowSize, frames)
self.frames = frames
#~ self.faces = [face for face in frame.getFaces() for frame in frames]
#~ def getMostWindowsClosestToMedian(self, nr = 5):
#~ """
#~ Get windows with the faces closest to the median
#~ """
#~ self.faces
def getWindowVectors(self, params):
return [window.getAverageV(params) for window in self.windows]
def getWindowsByDeviation(self, params):
vectors = self.getWindowVectors(params)
vAvg = np.mean(vectors, axis=0)
#~ diffs = [numpy.linalg.norm(v-vAvg) for v in vectors]
#~ min_index, min_value = min(enumerate(diffs), key=lambda p: p[1])
#~ max_index, max_value = max(enumerate(diffs), key=lambda p: p[1])
return sorted(self.windows, key=lambda w: np.linalg.norm(w.getAverageV(params)-vAvg))
def getUniqueWindows(self, params, nr=5):
windows = self.getWindowsByDeviation(params)
return windows[0: nr]
def getMostAvgWindows(self, params, nr=5):
windows = self.getWindowsByDeviation(params)
windows.reverse()
return windows[0:nr]
def getMostContrastingWindows(self, params, nr=5):
sortedWindows = sorted(self.windows, key=lambda w: w.getStdDev(params), reverse=True)
return sortedWindows[0:nr]
def loadFrames(frameDir):
global frames
nr = 2
nextFrame = Frame(frameDir, nr)
# TODO; make threaded and infinite loop that updates global frames
while nextFrame.exists():
frames[nr] = nextFrame
nr+=1
nextFrame = Frame(frameDir, nr)
return frames
def getLastFrame(frameDir):
jsons = sorted(glob.glob(os.path.join(frameDir, "*.json")))
if len(jsons):
lastJson = jsons[-1]
lastNr = int(lastJson[-11:-5])
frame = Frame(frameDir, lastNr)
return frame
return None
def cutOutFaces(frame, targetDir):
for faceNr, face in enumerate(frame.getFaces()):
print(faceNr, face)
img = face.getFaceImg()
faceImgPath = os.path.join(targetDir, frame.name + "-%s.jpg" % face.id)
print(faceImgPath)
img.save(faceImgPath)
pass
def validateJsonTimes():
lastTime = None
for frameNr, frame in loadFrames(args.frameOutput).items():
thisTime = frame.getJson()['t']
#print(frameNr, thisTime)
if not (lastTime is None) and lastTime > thisTime:
print "ERRROR!! Time error at %s. Restarted scanner there?" % frameNr
lastTime = thisTime
def sumEmotions():
total = 0.
summed = 0.
items = 0
for frameNr, frame in loadFrames(args.frameOutput).items():
for face in frame.getFaces():
total += abs(face.data['valence'])
summed += face.data['valence']
items += 1
average = summed / items
print ("Total emotion %d, positivity score %d (average: %s)" % (total, summed, average))
def getMostDisonant(nr = 5):
for frameNr, frame in loadFrames(args.frameOutput).items():
frame.updateDisonanceScores()
faces.sort(key=lambda x: x.disonanceScore, reverse=True)
mostDisonantFaces = faces[:nr]
for face in mostDisonantFaces:
print("Frame %d, face %d, score %d, valence %d" % (face.frame.nr, face.id, face.disonanceScore, face.data['valence']))
face.getFaceImg().show()
def getAnomalies(params, nr = 5):
for frameNr, frame in loadFrames(args.frameOutput).items():
frame.updateAnomalyScores(params)
faces.sort(key=lambda x: x.anomalyScore, reverse=True)
anomalies = faces[:nr]
for face in anomalies:
print("Frame %d, face %d, score %d" % (face.frame.nr, face.id, face.anomalyScore))
#~ getCharacteristicVector
face.getFaceImg().show()
def printFrameStats(frame, params):
os.system('clear')
print(time.time())
print( ("Nr: %d" % frame.nr).ljust(40) + ("t: {}".format(frame.getJson()['t'])) )
#~ print
faces = frame.getFaces()
print("Faces: %d" % len(faces))
if len(faces) < 1:
return
print " ".ljust(20), "0%".rjust(13), "q1".rjust(13), "median".rjust(13), "q3".rjust(13), "100%".rjust(13)
for p in params:
q0 = np.percentile(np.array([f.data[p] for f in faces]),0)
q1 = np.percentile(np.array([f.data[p] for f in faces]),25)
q2 = np.percentile(np.array([f.data[p] for f in faces]),50)
q3 = np.percentile(np.array([f.data[p] for f in faces]),75)
q4 = np.percentile(np.array([f.data[p] for f in faces]),100)
print p.ljust(20), ("%f%%" % q0).rjust(13), ("%f%%" % q1).rjust(13),("%f%%" % q2).rjust(13),("%f%%" % q3).rjust(13),("%f%%" % q4).rjust(13)
#~ TODO: speaker stats
frame.updateDisonanceScores()
dissonantFace = max(faces,key=lambda f: f.disonanceScore)
#~ dissonantFace.getFaceImg()
def monitorStatus(frameDir, params):
while True:
frame = getLastFrame(frameDir)
if not frame is None:
printFrameStats(frame, params)
# don't check too often
time.sleep(.5)
def playWindowStopmotion(window):
"""
Play a set of sliding window frames as stop motion video
"""
root = Tkinter.Tk()
root.geometry('%dx%d+%d+%d' % (1000,1000,0,0))
canvas = Tkinter.Canvas(root,width=1000,height=1000)
canvas.pack()
old_label_image = None
for frame in window.frames:
image = frame.getImg()
basewidth = 1000
wpercent = (basewidth / float(image.size[0]))
hsize = int((float(image.size[1]) * float(wpercent)))
image = image.resize((basewidth, hsize), Image.ANTIALIAS)
tkpi = ImageTk.PhotoImage(image)
canvas.delete("IMG")
imagesprite = canvas.create_image(500,500,image=tkpi, tags="IMG")
root.update()
time.sleep(1)
validateJsonTimes()
if args.sum:
sumEmotions()
if args.disonant:
getMostDisonant()
if args.cutAllFaces:
faceDir = os.path.join(args.frameOutput, 'faces')
if not os.path.exists(faceDir):
os.mkdir(faceDir)
for frameNr, frame in loadFrames(args.frameOutput).items():
cutOutFaces(faceDir)
if args.unique:
collection = WindowCollection(args.window_size, frames)
windows = collection.getUniqueWindows(args.params)
#~ print(windows)
playWindowStopmotion(windows[0])
if args.avg:
collection = WindowCollection(args.window_size, frames)
windows = collection.getMostAvgWindows(args.params)
#~ print(windows)
playWindowStopmotion(windows[0])
if args.status:
monitorStatus(args.frameOutput, args.params)