""" Foucault allows you to govern a set of commands, which can be useful for i.e. an art installation setting, in which different scripts need to work together. While one can be (re)started independently of the rest. """ import asyncio from collections import defaultdict import os import pathlib import random from subprocess import Popen import subprocess from tempfile import mkstemp import threading import time import tomli from typing import Optional from nicegui import ui import logging import tomli_w logger = logging.getLogger('procescontroller') numbers = [] processes = [] class Argument(): def __init__(self, name: str, enabled: bool = True, inline: bool = False, boolean: bool = False, options: list[str] = [], selected: list[str] = []): self.name = name self.enabled = enabled self.inline = inline self.boolean = boolean self.options = options self.selected = selected def as_list(self) -> list[str]: if not self.enabled: return [] l = [] if not self.inline: l.append(self.name) if not self.boolean: l += self.selected return l def select(self, option: str): self.selected = [option] def toggle(self): self.enabled = not self.enabled @classmethod def from_dict(cls, data: dict): return cls( data['name'], data.get('enabled', False), data.get('inline', False), data.get('boolean', False), data.get('options', []), data.get('selected', []), ) def to_dict(self) -> dict: return self.__dict__ class SubprocessController(): def __init__(self, name: str, cmd: list[str], arguments: list[Argument], environment: dict[str,str] = {}, fn: Optional[pathlib.Path] = None): self.cmd = cmd self.arguments = arguments self.name = name self.filename = fn # self.tmp_config_fp, self.tmp_config_filename = mkstemp(suffix=".yml", prefix=f"config_{self.name}_") self.env_overrides = environment # dict[str, str] = { # "DISPLAY": ":1" # } self.proc=None self.running_config_state: Optional[str] = None self.saved_state = self.config_state() @classmethod def from_toml(cls, filename: os.PathLike): logger.info(f"Load from TOML {filename}") path = pathlib.Path(filename) name = path.stem args = [] with path.open('rb') as fp: data = tomli.load(fp) # print(data) for arg in data.get('arguments', []): args.append(Argument.from_dict(arg)) sc = cls( name, [data['cmd']] if data['cmd'] is str else data['cmd'], args, data.get('environment', {}), path ) return sc def to_dict(self): return { "cmd": self.cmd, "arguments": [a.to_dict() for a in self.arguments], "environment": self.env_overrides, } def update_config(self): pass def get_environment(self): default_env = os.environ.copy() return default_env | self.env_overrides def get_environment_strs(self): return [f"{k}=\"{v}\"" for k, v in self.env_overrides.items()] def rm_env(self, key): if key in self.env_overrides: del self.env_overrides[key] def add_env(self, key, value): self.env_overrides[key] = value def get_arguments(self)-> list[str]: r = [] for a in self.arguments: r.extend(a.as_list()) return r def as_bash_string(self): return " " . join(self.get_environment_strs() + self.cmd + self.get_arguments()) def run(self): logger.info(f"Run: {self.as_bash_string()}") self.running_config_state = self.as_bash_string() self.proc = Popen( self.cmd + self.get_arguments(), env=self.get_environment(), stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) def is_stale(self): """Config has changed since starting the process""" return self.is_running() and self.running_config_state != self.as_bash_string() def quit(self): if self.is_running(): self.proc.terminate() def restart(self): self.quit() self.run() def return_code(self) -> Optional[int]: if self.proc is None: return None return self.proc.returncode def is_running(self): return self.proc is not None and self.proc.poll() is None def state(self): return "::".join( [self.name, str(self.is_running()), str(self.return_code())] ) + "::" + self.config_state() + (self.running_config_state if self.running_config_state else "") def config_state(self): return "::".join( self.cmd + self.get_arguments() + ["+".join(a.options) for a in self.arguments] + [str(self.filename.stat().st_mtime) if self.filename else ""] + list(self.env_overrides.values()) ) def save(self) -> str: # print(self.to_dict()) with self.filename.open('wb') as fp: tomli_w.dump(self.to_dict(), fp) self.saved_state = self.config_state() def unsaved_changes(self): return self.config_state() != self.saved_state class Foucault(): def __init__(self): self.processes: list[SubprocessController] = [] self.uis: list[SubprocessUI] = [] self.directories: list[pathlib.Path] = [] def add_process(self, sc: SubprocessController): logger.info(f"add process {sc.as_bash_string()}") self.processes.append(sc) self.uis.append(SubprocessUI(sc)) self.ui.refresh() def watch(self, path: os.PathLike): path = pathlib.Path(path).resolve() if path.is_dir(): if path in self.directories: logger.warning(f"Path already watched {path}") self.directories.append(path) self.update_watched() elif path.suffix == 'toml': self.add_process(SubprocessController.from_toml(path)) else: raise RuntimeError(f"Not a valid path {path}") def update_watched(self): # TODO)) It would be great to use e.g. Watchdog fns = [scu.sc.filename for scu in self.uis] for d in self.directories: config_files = d.glob("**/*.toml") for path in config_files: if path not in fns: self.add_process(SubprocessController.from_toml(path)) def run_all(self): for p in self.processes: p.run() def stop_all(self): for p in self.processes: p.quit() def restart_all(self): # when restarting all, make sure all are stopped # before starting, so we start with a clean slate self.stop_all() self.run_all() @ui.refreshable_method def ui(self): with ui.row(): ui.button('▶', on_click=self.run_all) ui.button('■', on_click=self.stop_all) ui.button('↺', on_click=self.restart_all) with ui.grid(columns="40em 40em 40em"): for proc_ui in self.uis: proc_ui.ui() def run(self): # t = threading.Thread(target=reloader, daemon=True) t = threading.Thread(target=self.monitor, daemon=True) t.start() #build ui: self.ui() ui.page_title("Conduct of conduct") ui.run(show=False, favicon="🫣") def monitor(self): states = defaultdict(lambda: "") i=0 while True: i+=1 # if we're only interested in running, we can use os.waitpid(), but we also check config changes time.sleep(.3) self.update_watched() for proc_ui in self.uis: state = proc_ui.sc.state() if state != states[proc_ui.sc]: proc_ui.ui.refresh() states[proc_ui.sc] = state class SubprocessUI: def __init__(self, sc: SubprocessController): self.sc = sc @ui.refreshable_method def ui(self): card_class = "bg-teal" if self.sc.is_running() else ("bg-warning" if self.sc.return_code() and math.abs(self.sc.return_code()) != 15 else "bg-gray") with ui.card().classes(card_class): with ui.row(align_items="stretch").classes('w-full'): if self.sc.return_code(): ui.label(f'⚠').classes('text-h5') ui.label(f'{self.sc.name}').tooltip(str(self.sc.filename)).classes('text-h5') ui.space() with ui.button_group(): ui.button('▶', on_click=self.sc.run) ui.button('■', on_click=self.sc.quit) ui.button('↺', on_click=self.sc.restart, color='red' if self.sc.is_stale() else "primary") ui.label(f'Running: {self.sc.is_running()}' + f"({self.sc.return_code()})") ui.code(f'{" ".join(self.sc.cmd)}') ui.code(self.sc.as_bash_string()) ui.separator() with ui.row().classes('w-full'): edit = ui.switch(value=self.sc.unsaved_changes()) ui.space() ui.button('🖫', on_click=self.sc.save, color="red" if self.sc.unsaved_changes() else 'lightgray').classes('text-lg') with ui.grid().bind_visibility_from(edit, 'value'): ui.label('Arguments').classes('text-h6') for i, argument in enumerate(self.sc.arguments): with ui.card().classes('w-full'): with ui.row(align_items='center').classes('w-full'): ui.button("⊗", on_click=lambda i=i: self.sc.arguments.pop(i)) ui.label(f'{argument.name}').classes('text-stone-400' if argument.inline else '') ui.space() ui.switch(value=argument.enabled, on_change=lambda i=argument: i.toggle()) if not argument.boolean: with ui.dropdown_button(f"{len(argument.options)} | " + ' '.join(argument.selected), auto_close=False).classes('normal-case'): # ui.badge('0', color='red').props('floating') for o in argument.options: # TODO)) Remove ui.item(o +'i', on_click=(lambda a=argument, o=o: a.select(o) )) with ui.item(): ui.input(placeholder="add new").on('keydown.enter', (lambda e, a=argument: a.options.append(e.sender.value) or print(a.options))) # ui.label(f'{argument.name}') # with ui.row(): with ui.card(): with ui.row().classes('w-full'): name = ui.input(placeholder="argument").classes('w-full')#.on("keydown.enter", ) with ui.row(): enabled = ui.switch('enabled', value=True) positional = ui.checkbox('positional') #inline boolean = ui.checkbox('boolean') ui.button("+", on_click=lambda e, name=name, enabled=enabled, positional=positional, boolean=boolean: self.sc.arguments.append(Argument( name.value, enabled.value, positional.value, boolean.value ))) ui.label('Environment variables').classes('text-h6') with ui.card(): for k, v in self.sc.env_overrides.items(): with ui.row(align_items="center"): ui.button("⊗", on_click=lambda k=k: self.sc.rm_env(k)) ui.label(f"{k}=\"{v}\"") with ui.row(): name = ui.input(placeholder="name")#.on("keydown.enter", ) value = ui.input(placeholder="value")#.on("keydown.enter", ) ui.button("+", on_click=lambda e, name=name, enabled=value: self.sc.add_env(name.value, value.value))