import sqlite3 import asyncio import logging mainLogger = logging.getLogger("hugvey") logger = mainLogger.getChild("variableStore") class Variable: def __init__(self, name: str, value: str, hugveyId: int, languageCode: str, runId: str): self.name = name self.value = value self.hugveyId = hugveyId self.languageCode = languageCode self.runId = runId class VariableStore: def __init__(self, db_filename): self.conn = sqlite3.connect(db_filename, check_same_thread=False) # make sure the table exits. createSqls = [""" CREATE TABLE IF NOT EXISTS `variables` ( `run_id` VARCHAR(32), `name` VARCHAR(255), `hugvey` INTEGER, `language_code` VARCHAR(100), `createdAt` TIMESTAMP DEFAULT CURRENT_TIMESTAMP, `val` VARCHAR(1024) ); """, """ CREATE INDEX IF NOT EXISTS `name_time` ON `variables` ( `language_code`, `name`, `createdAt` DESC ); """, """ CREATE UNIQUE INDEX IF NOT EXISTS `unique_per_run` ON `variables` ( `run_id`, `name` ); """] cur = self.conn.cursor() for sql in createSqls: cur.execute(sql) self.conn.commit() self.q = asyncio.Queue() def addVariable(self, runId, name, value, hugveyId, languageCode): logger.debug(f"Queing storing of {name} for {hugveyId} ({languageCode}) - run {runId}") self.q.put_nowait(Variable(name, value, hugveyId, languageCode, runId)) async def queueProcessor(self): while True: #: :var v: Variable v = await self.q.get() c = self.conn.cursor() logger.info(f"Store variable {v.name} for {v.hugveyId} ({v.languageCode}): '{v.value}' - run {v.runId}") # use runId to update, rather than insert a variable for the same run. c.execute("INSERT OR REPLACE INTO variables (run_id, name, hugvey, language_code, createdAt, val) VALUES (?,?,?, ?, current_timestamp,?)", (v.runId,v.name, v.hugveyId, v.languageCode, v.value)) self.conn.commit() c.close() def getLastOfName(self, name, languageCode, n = 10, unique = False): cur = self.conn.cursor() logging.debug(f"Get last {n} stored variables of {name} for {languageCode}") distinct = "DISTINCT" if unique else "" cur.execute(f"SELECT {distinct} val FROM variables WHERE language_code = ? AND name = ? ORDER BY createdAt DESC LIMIT ?", (languageCode, name, n)) values = [v[0] for v in cur.fetchall()] cur.close() return values