143 lines
3.6 KiB
Python
143 lines
3.6 KiB
Python
import os
|
|
from PIL import Image, ImageDraw
|
|
import argparse
|
|
import json
|
|
|
|
|
|
parser = argparse.ArgumentParser(description='Parses opencv-webcam-demo json output files and collects statistics')
|
|
parser.add_argument('--frameOutput', '-o', required=True, help='directory to look for frames & json')
|
|
|
|
args = parser.parse_args()
|
|
|
|
faces = []
|
|
class Face:
|
|
def __init__(self, frame, data):
|
|
self.id = data['id']
|
|
self.frame = frame # Frame class
|
|
self.data = data # json data
|
|
self.disonanceScore = None
|
|
|
|
def getFaceImg(self):
|
|
r = self.data['rect']
|
|
return self.frame.getImg().crop((int(r['x']), int(r['y']), int(r['x']+r['w']), int(r['y']+r['h'])))
|
|
|
|
class Frame:
|
|
"""
|
|
Everything for an analysed frame
|
|
"""
|
|
def __init__(self, outputPath, nr):
|
|
self.outputPath = outputPath
|
|
self.nr = nr
|
|
self.name = "frame%06d" % nr
|
|
self.jsonPath = os.path.join(outputPath, ("frame%06d" % (nr)) + ".json")
|
|
self.imgPath = os.path.join(outputPath, self.name + ".jpg")
|
|
self.faces = None # init with getFaces
|
|
|
|
def getTime(self):
|
|
return os.path.getmtime(self.imgPath)
|
|
|
|
def getJson(self):
|
|
#~ try:
|
|
with open(self.jsonPath) as fp:
|
|
return json.load(fp)
|
|
#~ except Exception as e:
|
|
#~ # no json file yet?
|
|
#~ return None
|
|
|
|
def getImg(self):
|
|
return Image.open(self.imgPath)
|
|
|
|
def getFaces(self):
|
|
if self.faces is None:
|
|
j = self.getJson()
|
|
|
|
self.faces = [Face(self, f) for f in j['faces']]
|
|
faces.extend(self.faces)
|
|
|
|
return self.faces
|
|
|
|
def updateDisonanceScores(self):
|
|
totalValence = 0.0
|
|
totalFaces = 0
|
|
for face in self.getFaces():
|
|
totalValence += face.data['valence']
|
|
totalFaces += 1
|
|
|
|
if totalFaces == 0:
|
|
return
|
|
|
|
avgValence = totalValence / totalFaces
|
|
|
|
for face in self.getFaces():
|
|
face.disonanceScore = abs(face.data['valence'] - avgValence)
|
|
|
|
|
|
|
|
def exists(self):
|
|
return os.path.exists(self.jsonPath) and os.path.exists(self.imgPath)
|
|
|
|
frames = {}
|
|
|
|
def loadFrames(frameDir):
|
|
global frames
|
|
nr = 2
|
|
nextFrame = Frame(frameDir, nr)
|
|
# TODO; make threaded and infinite loop that updates global frames
|
|
while nextFrame.exists():
|
|
frames[nr] = nextFrame
|
|
nr+=1
|
|
nextFrame = Frame(frameDir, nr)
|
|
return frames
|
|
|
|
def cutOutFaces(frame, targetDir):
|
|
for faceNr, face in enumerate(frame.getFaces()):
|
|
print(faceNr, face)
|
|
img = face.getFaceImg()
|
|
faceImgPath = os.path.join(targetDir, frame.name + "-%s.jpg" % face.id)
|
|
print(faceImgPath)
|
|
img.save(faceImgPath)
|
|
pass
|
|
|
|
frames = loadFrames(args.frameOutput)
|
|
|
|
lastTime = None
|
|
for frameNr, frame in frames.items():
|
|
thisTime = frame.getJson()['t']
|
|
#print(frameNr, thisTime)
|
|
if not (lastTime is None) and lastTime > thisTime:
|
|
print "ERRROR!! Time error at %s. Restarted scanner there?" % frameNr
|
|
lastTime = thisTime
|
|
|
|
faceDir = os.path.join(args.frameOutput, 'faces')
|
|
|
|
if not os.path.exists(faceDir):
|
|
os.mkdir(faceDir)
|
|
|
|
def sumEmotions():
|
|
total = 0.
|
|
summed = 0.
|
|
items = 0
|
|
for frameNr, frame in frames.items():
|
|
for face in frame.getFaces():
|
|
total += abs(face.data['valence'])
|
|
summed += face.data['valence']
|
|
items += 1
|
|
|
|
average = summed / items
|
|
print ("Total emotion %d, positivity score %d (average: %s)" % (total, summed, average))
|
|
|
|
def getMostDisonant(nr = 5):
|
|
for frameNr, frame in frames.items():
|
|
frame.updateDisonanceScores()
|
|
faces.sort(key=lambda x: x.disonanceScore, reverse=True)
|
|
|
|
mostDisonantFaces = faces[:5]
|
|
for face in mostDisonantFaces:
|
|
print("Frame %d, face %d, score %d, valence %d" % (face.frame.nr, face.id, face.disonanceScore, face.data['valence']))
|
|
face.getFaceImg().show()
|
|
|
|
|
|
sumEmotions()
|
|
getMostDisonant()
|
|
#~ for frameNr, frame in frames.items():
|
|
#~ cutOutFaces(frame, faceDir)
|