- Google OAuth login with JWT session cookies, per-user project isolation - Remote worker registration via WebSocket, execute_on_worker/list_workers agent tools - File browser UI in workflow view, file upload/download API - Deploy script switched to local build, added tori.euphon.cloud ingress
183 lines
5.7 KiB
Python
Executable File
183 lines
5.7 KiB
Python
Executable File
#!/usr/bin/env -S uv run --script
|
|
# /// script
|
|
# requires-python = ">=3.10"
|
|
# dependencies = ["websockets"]
|
|
# ///
|
|
"""tori-worker: connects to Tori server via WebSocket, reports hardware info, executes scripts."""
|
|
|
|
import argparse
|
|
import asyncio
|
|
import json
|
|
import os
|
|
import platform
|
|
import shutil
|
|
import subprocess
|
|
import tempfile
|
|
|
|
import websockets
|
|
|
|
|
|
def get_cpu_info() -> str:
|
|
"""Get CPU model name."""
|
|
try:
|
|
with open("/proc/cpuinfo") as f:
|
|
for line in f:
|
|
if line.startswith("model name"):
|
|
return line.split(":", 1)[1].strip()
|
|
except Exception:
|
|
pass
|
|
return platform.processor() or platform.machine()
|
|
|
|
|
|
def get_memory_info() -> str:
|
|
"""Get total memory."""
|
|
try:
|
|
with open("/proc/meminfo") as f:
|
|
for line in f:
|
|
if line.startswith("MemTotal"):
|
|
kb = int(line.split()[1])
|
|
gb = kb / (1024 * 1024)
|
|
return f"{gb:.1f} GB"
|
|
except Exception:
|
|
pass
|
|
return "unknown"
|
|
|
|
|
|
def get_gpu_info() -> str:
|
|
"""Get GPU info via nvidia-smi if available."""
|
|
nvidia_smi = shutil.which("nvidia-smi")
|
|
if nvidia_smi:
|
|
try:
|
|
out = subprocess.check_output(
|
|
[nvidia_smi, "--query-gpu=name,memory.total", "--format=csv,noheader,nounits"],
|
|
timeout=5, text=True
|
|
).strip()
|
|
gpus = []
|
|
for line in out.splitlines():
|
|
parts = [p.strip() for p in line.split(",")]
|
|
if len(parts) >= 2:
|
|
gpus.append(f"{parts[0]} ({parts[1]} MiB)")
|
|
else:
|
|
gpus.append(parts[0])
|
|
return "; ".join(gpus)
|
|
except Exception:
|
|
pass
|
|
return "none"
|
|
|
|
|
|
def get_worker_info(name: str) -> dict:
|
|
return {
|
|
"name": name,
|
|
"cpu": get_cpu_info(),
|
|
"memory": get_memory_info(),
|
|
"gpu": get_gpu_info(),
|
|
"os": f"{platform.system()} {platform.release()}",
|
|
"kernel": platform.release(),
|
|
}
|
|
|
|
|
|
async def execute_script(script: str, timeout: int = 300) -> dict:
|
|
"""Execute a bash script and return result.
|
|
|
|
If the script starts with a Python shebang or `# /// script` (uv inline metadata),
|
|
it's written as .py and run via `uv run --script`. Otherwise it's run as bash.
|
|
"""
|
|
is_python = script.lstrip().startswith(("#!/usr/bin/env python", "# /// script", "#!/usr/bin/python", "import ", "from "))
|
|
suffix = ".py" if is_python else ".sh"
|
|
|
|
with tempfile.NamedTemporaryFile(mode="w", suffix=suffix, delete=False) as f:
|
|
f.write(script)
|
|
f.flush()
|
|
script_path = f.name
|
|
|
|
try:
|
|
if is_python:
|
|
cmd = ["uv", "run", "--script", script_path]
|
|
else:
|
|
cmd = ["bash", script_path]
|
|
|
|
proc = await asyncio.create_subprocess_exec(
|
|
*cmd,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
)
|
|
try:
|
|
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
|
|
except asyncio.TimeoutError:
|
|
proc.kill()
|
|
await proc.wait()
|
|
return {
|
|
"job_id": "",
|
|
"exit_code": -1,
|
|
"stdout": "",
|
|
"stderr": f"Script timed out after {timeout}s",
|
|
}
|
|
|
|
return {
|
|
"job_id": "",
|
|
"exit_code": proc.returncode,
|
|
"stdout": stdout.decode(errors="replace"),
|
|
"stderr": stderr.decode(errors="replace"),
|
|
}
|
|
finally:
|
|
os.unlink(script_path)
|
|
|
|
|
|
async def run_worker(server_url: str, name: str):
|
|
info = get_worker_info(name)
|
|
print(f"Worker info: {json.dumps(info, indent=2)}")
|
|
|
|
while True:
|
|
try:
|
|
print(f"Connecting to {server_url} ...")
|
|
async with websockets.connect(server_url) as ws:
|
|
# Register
|
|
reg_msg = json.dumps({"type": "register", "info": info})
|
|
await ws.send(reg_msg)
|
|
|
|
ack = json.loads(await ws.recv())
|
|
if ack.get("type") == "registered":
|
|
print(f"Registered as '{ack.get('name')}'")
|
|
else:
|
|
print(f"Unexpected ack: {ack}")
|
|
await asyncio.sleep(5)
|
|
continue
|
|
|
|
# Main loop: receive jobs, execute, send results
|
|
async for message in ws:
|
|
msg = json.loads(message)
|
|
if msg.get("type") == "execute":
|
|
job_id = msg["job_id"]
|
|
script = msg["script"]
|
|
print(f"Executing job {job_id}: {script[:80]}...")
|
|
|
|
result = await execute_script(script)
|
|
result["job_id"] = job_id
|
|
result["type"] = "result"
|
|
|
|
await ws.send(json.dumps(result))
|
|
print(f"Job {job_id} done (exit={result['exit_code']})")
|
|
|
|
except (websockets.exceptions.ConnectionClosed, ConnectionRefusedError, OSError) as e:
|
|
print(f"Connection lost ({e}), reconnecting in 5s...")
|
|
await asyncio.sleep(5)
|
|
except Exception as e:
|
|
print(f"Error: {e}")
|
|
await asyncio.sleep(5)
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Tori Worker")
|
|
parser.add_argument("--server", default="wss://tori.euphon.cloud/ws/tori/workers",
|
|
help="WebSocket server URL")
|
|
parser.add_argument("--name", default=platform.node(),
|
|
help="Worker name (default: hostname)")
|
|
args = parser.parse_args()
|
|
|
|
print(f"Starting tori-worker '{args.name}' -> {args.server}")
|
|
asyncio.run(run_worker(args.server, args.name))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|