import asyncio import os import pathlib import random from subprocess import Popen from tempfile import mkstemp import threading import time from nicegui import ui numbers = [] processes = [] class SubprocessController(): def __init__(self, name: str, cmd): self.cmd = cmd self.name = name # self.tmp_config_fp, self.tmp_config_filename = mkstemp(suffix=".yml", prefix=f"config_{self.name}_") self.env_overrides: dict[str, str] = { "DISPLAY": ":1" } self.proc=None pass def update_config(self): pass def get_environment(self): default_env = os.environ.copy() return default_env | self.env_overrides def run(self): self.proc = Popen( self.cmd, env=self.get_environment(), stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) def quit(self): if self.is_running(): self.proc.terminate() def is_running(self): return self.proc is not None and self.proc.poll() is None # class SubprocessUi(): # def __init__(self, controller: SubprocessController): # self.controller = controller # @ui.refreshable_method # def card(self): # with ui.card(): # ui.label(self.controller.name) # ui.label(str(self.controller.is_running())) class Foucault(): def __init__(self): self.processes: list[SubprocessController] = [] self.uis: list[SubprocessUI] = [] def add_process(self, sc: SubprocessController): self.processes.append(sc) self.uis.append(SubprocessUI(sc)) def ui(self): with ui.row(): for proc_ui in self.uis: proc_ui.ui() def run(self): i=0 while True: i+=1 time.sleep(1) for proc_ui in self.uis: proc_ui.ui.refresh() # if i >10: # print('cancelling') # sc.quit() conductofconduct = Foucault() @ui.refreshable def number_ui() -> None: print(conductofconduct.processes) ui.label("test: " + ','.join([str(process.is_running()) for process in conductofconduct.processes])) # ui.label(', '.join(str(n) for n in sorted(conductofconduct.processes))) def add_number() -> None: numbers.append(random.randint(0, 100)) number_ui.refresh() def reloader(): """Test reloading""" for i in range(200): print("add") numbers.append(i) time.sleep(1) number_ui.refresh() class SubprocessUI: def __init__(self, sc: SubprocessController): self.sc = sc @ui.refreshable_method def ui(self): with ui.card(): ui.label(f'{self.sc.name}') ui.label(f'Running: {self.sc.is_running()}') ui.button('Run', on_click=self.sc.run) ui.button('Stop', on_click=self.sc.quit) conductofconduct.add_process(SubprocessController("test1" , ["tail", '-f', "gui.py"])) conductofconduct.add_process(SubprocessController("test2" , ["tail", '-f', "gui.py"])) conductofconduct.ui() # t = threading.Thread(target=reloader, daemon=True) t = threading.Thread(target=conductofconduct.run, daemon=True) t.start() # number_ui() # ui.button('Add random number', on_click=add_number) ui.run(show=False)