hugvey/hugvey/tools.py

330 lines
15 KiB
Python

import logging
import yaml
import os
import json
from hugvey.voice import VoiceStorage
import re
import csv
from operator import indexOf
import operator
logger = logging.getLogger('toolbox')
# From https://stackoverflow.com/a/1165552
class DictDiffer(object):
"""
Calculate the difference between two dictionaries as:
(1) items added
(2) items removed
(3) keys same in both but changed values
(4) keys same in both and unchanged values
"""
def __init__(self, current_dict, past_dict):
self.current_dict, self.past_dict = current_dict, past_dict
self.set_current, self.set_past = set(current_dict.keys()), set(past_dict.keys())
self.intersect = self.set_current.intersection(self.set_past)
def added(self):
return self.set_current - self.intersect
def removed(self):
return self.set_past - self.intersect
def changed(self):
return set(o for o in self.intersect if self.past_dict[o] != self.current_dict[o])
def unchanged(self):
return set(o for o in self.intersect if self.past_dict[o] == self.current_dict[o])
class Toolbox:
def __init__(self, configFile):
self.languageFiles = {}
self.languageConfig = {}
with open(configFile, 'r') as fp:
logger.debug('Load config from {}'.format(configFile))
self.config = yaml.safe_load(fp)
self.hugvey_ids = [i + 1 for i in range(self.config['hugveys'])]
self.loadLanguages()
voice_dir = os.path.join(self.config['web']['files_dir'], 'voices')
self.voiceStorage = VoiceStorage(voice_dir, self.languageConfig)
def loadLanguages(self):
logger.debug('load language files')
self.languages = {}
for lang in self.config['languages']:
lang_filename = os.path.join(self.config['web']['files_dir'], lang['file'])
self.languageFiles[lang['code']] = lang['file']
self.languageConfig[lang['code']] = lang
with open(lang_filename, 'r') as fp:
self.languages[lang['code']] = json.load(fp)
if lang['token'] == 'LB_TOKEN' or lang['token'] == 'SECRET_KEY':
raise Exception("Are you using the right config file? Language key not configured properly!")
def get_audio_filenames(self):
"""
Get all audio files as defined trough the config.
"""
filenames = [
'local/crash.wav'
]
for langCode in self.languages:
logger.info(f'lang {langCode}')
msgs = [node for node in self.languages[langCode] if node['@type'] == 'Msg']
for msg in msgs:
if 'audio' in msg and msg['audio'] is not None:
filenames.append(msg['audio']['file'])
continue
if '$' in msg['text']:
# skip variable texts
continue
fn = self.voiceStorage.getFilename(langCode, msg['text'], False)
filenames.append(fn)
return filenames
def get_existing_filesnames(self):
existing_files = []
for path, subdirs, files in os.walk(self.config['web']['files_dir']):
for name in files:
if name[-4:] == '.wav':
existing_files.append(os.path.join(path, name))
return existing_files
def clean_audio_files(self):
needed_files = self.get_audio_filenames()
existing_files = self.get_existing_filesnames()
# if 'local/voices/en-GB/static/9c/9ce29fe21fa813cca9db94419947238f6f215da1.wav' in needed_files:
# print("GOOO!")
# else:
# print('ojee')
# exit()
for fn in existing_files:
if fn not in needed_files:
logger.warn(f"Remove {fn}")
os.unlink(fn)
else:
logger.debug(f"Keep {fn}")
missingFiles = []
for fn in needed_files:
if fn not in existing_files:
missingFiles.append(fn)
# logger.info(f"Missing {fn}")
logger.info("{} files missing".format(len(missingFiles)))
@classmethod
def find_direction_for_condition(cls, conditionId, story):
for i, item in enumerate(story):
if item['@type'] == 'Direction':
for dConditionId in item['conditions']:
if dConditionId == conditionId:
return item
def fix_story_file(self, lang_code):
if lang_code not in self.languages.keys():
logger.critical("Invalid langauge code")
logger.warn(f"Valid codes are {' '.join(self.languages.keys())}")
return
filename = os.path.join(self.config['web']['files_dir'],self.languageFiles[lang_code])
story = self.languages[lang_code]
beginnings = [item for item in story if 'beginning' in item and item['beginning'] is True]
if len(beginnings) < 1:
logger.critical("No beginning set")
if len(beginnings) > 1:
beginningIds = [i['@id'] for i in beginnings]
logger.warn(f"{len(beginnings)} beginning messages configured. Set only one of {beginningIds}")
itemsPerId = {item['@id']: item for item in story}
orphans = 0
for i, item in enumerate(story):
if item['@type'] == 'Direction':
if type(item['source']) == dict:
logger.warn(f"Fixing broken direction {item['@id']}, please check if everything still works!")
validMsg = itemsPerId[item['source']['@id']]
diff = DictDiffer(item['source'], validMsg)
if diff.changed() or diff.added() or diff.removed():
logger.warn("Changes found between messages")
logger.warn(f"Changed: {list(diff.changed())} Keys that will be remove: {list(diff.added())} Keys that will be added: {list(diff.removed())}")
logger.info(f"Direction pointed to {item['source']}")
logger.info(f"Will now point to {validMsg}")
item['source'] = item['source']['@id']
for conditionId in item['conditions']:
if conditionId not in itemsPerId:
logger.critical(f"Direction {item['@id']} refers to non-existing condition {conditionId}! (This will result in a crash when playing the message)")
if item['@type'] == 'Condition':
direction = self.find_direction_for_condition(item['@id'], story)
if not direction:
orphans +=1
# This should be fine, but I don't dare to do it yet...
# logger.info("Clear residu condition {item['@id']} ... this is not properly done by the editor.")
# del story[i]
continue
if item['type'] == 'messagePlayed':
msgId = item['vars']['msgId'].strip()
if msgId not in itemsPerId:
logger.warning(f"Message played condition for non-existing message {msgId} when going from {direction['source']} to {direction['target']}! (this will ignore the condition)")
if item['type'] == 'replyContains':
if 'regex' in item['vars'] and len(item['vars']['regex'].rstrip()):
try:
re.compile(item['vars']['regex'].rstrip())
except Exception as e:
logger.critical(f"Invalid regex for condition {item['@id']}: {item['vars']['regex'].rstrip()}")
logger.exception(e)
logger.debug( f"Can clear {orphans} orphaned conditions (uncomment code in tools.py)")
with open(filename, 'w') as fp:
json.dump(story, fp, indent=2)
logger.info(f"Wrote to {filename}")
csv_fieldnames = ['id','type','color','text','regex','to text','translation', 'regex_translation']
def generate_story_csv(self, lang_code):
if lang_code not in self.languages.keys():
logger.critical("Invalid langauge code")
logger.warn(f"Valid codes are {' '.join(self.languages.keys())}")
return
filename = os.path.join(self.config['web']['files_dir'],self.languageFiles[lang_code])
story = self.languages[lang_code]
csv_filename = filename + '.csv'
logger.info(f"Write csv of {lang_code} to {csv_filename}")
with open(csv_filename, 'w', newline='') as fp:
writer = csv.DictWriter(fp, fieldnames=self.csv_fieldnames)
writer.writeheader()
msgs = [node for node in story if node['@type'] == 'Msg']
msgs = sorted(msgs, key=lambda m: m['color'] if 'color' in m else '')
for msg in msgs:
writer.writerow({'id': msg['@id'], 'type':'Msg','color':msg['color'] if 'color' in msg else '', 'text': msg['text']})
directions = [subnode for subnode in story if subnode['@type'] == 'Direction' and subnode['source'] == msg['@id']]
for direction in directions:
targetMsg = [subnode for subnode in story if subnode['@id'] == direction['target']][0]
for conditionId in direction['conditions']:
condition = [subnode for subnode in story if subnode['@id'] == conditionId][0]
if condition['type'] == 'replyContains' and len(condition['vars']['regex']) > 0:
text = condition['vars']['regex']
else:
continue
writer.writerow({'id': condition['@id'], 'type':'Condition', 'regex': text, 'to text': targetMsg['text']})
diversions = [node for node in story if node['@type'] == 'Diversion' and node['type'] == 'reply_contains']
for diversion in diversions:
if len(diversion['params']['regex']) < 1:
continue
writer.writerow({'id': diversion['@id'], 'type':'Diversion','regex': diversion['params']['regex']})
logger.info(f"Done")
def import_story_csv(self, lang_code, csv_filename):
if lang_code not in self.languages.keys():
logger.critical("Invalid langauge code")
logger.warn(f"Valid codes are {' '.join(self.languages.keys())}")
return
filename = os.path.join(self.config['web']['files_dir'],self.languageFiles[lang_code])
story = self.languages[lang_code]
logger.info(f"Writing translation from {csv_filename} to {filename}")
with open(csv_filename, 'r') as fp:
reader = csv.DictReader(fp)
logger.info(reader.fieldnames)
if 'id' not in reader.fieldnames or 'translation' not in reader.fieldnames or 'regex_translation' not in reader.fieldnames or 'text' not in reader.fieldnames:
raise Exception("Not all required fieldnames are given in csv: id, translation, regex_translation")
for row in reader:
if not any(row.values()):
logger.info(f"Skipping empty row")
continue
if not row['id']:
logger.critical(f"Skipping row without ID, but with data: {list(row.values())}")
continue
try:
node = [node for node in story if node['@id'] == row['id']][0]
except Exception as e:
logger.critical(f"Exception finding node id {row}")
logger.exception(e)
raise(e)
if node['@type'] == 'Msg':
if len(row['translation']) < 1 and len(node['text']) > 0:
logger.warning(f"Skipping empty translation for message {node['@id']} \"{node['text']}\"")
continue
if 'label' not in node or (len(node['label']) < 1 and node['text'] == row['text']):
node['label'] = row['text'] # store original text as label for readability
node['text'] = row['translation']
elif node['@type'] == 'Condition':
if len(row['regex_translation']) < 1 and len(node['vars']['regex']) > 0:
logger.warning(f"Skipping empty translation for regex {node['@id']} \"{node['vars']['regex']}\"")
continue
node['vars']['regex'] = row['regex_translation']
elif node['@type'] == 'Diversion':
if len(row['regex_translation']) < 1 and len(node['params']['regex']) > 0:
logger.warning(f"Skipping empty translation for regex {node['@id']} \"{node['params']['regex']}\"")
continue
node['params']['regex'] = row['regex_translation']
else:
raise Exception(f"Unknown type: {row}")
with open(filename, 'w') as fp:
json.dump(story, fp, indent=2)
logger.info(f"Wrote to {filename}")
def parse_cutelog(self, filename):
with open(filename,'r') as fp:
cutelog = json.load(fp);
hugvey_ids = list(range(1,30))
hugveys_stats = {}
for id in hugvey_ids:
print(f"HUGVEY {id}")
log = [i for i in cutelog if 'name' in i and i['name'].startswith(f'hugvey.{id}.')]
txts = [i for i in log if 'msg' in i and ((i['msg'].startswith('Text: ') and i['msg'] != "Text: ") or i['msg'].startswith('Current message') or i['msg'].startswith('ignore'))]
last = None
for txt in txts:
if last:
if txt['msg'].startswith('Current'):
print('--------------------', txt['created'])
elif txt['msg'].startswith('ignore'):
print('/////////////////////', txt['created'])
else:
print(txt['created'] - last['created'], txt['msg'], txt['levelname']
)
last = txt
else:
last = txt
tC = [i for i in log if 'msg' in i and (i['msg'].startswith("Condition is met"))]
tR = [i for i in log if 'msg' in i and (i['msg'].startswith("Received {'file"))]
tP = [i for i in log if 'msg' in i and (i['msg'].startswith("['play'"))]
for i, txt in enumerate(tP):
print(txt['created']-tC[i]['created'], txt['msg'], tC[i]['msg'], tR[i]['msg'])
print('===================')