import struct
import asyncio
import logging
import time
import json
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from unsprawl.simulation.advect import AdvectEngine
from unsprawl.scenarios import ScenarioDirector
# Initialize Core
app = FastAPI()
logger = logging.getLogger("unsprawl.api")
logging.basicConfig(level=logging.INFO)
# Global Simulation & Director (Singleton)
SIMULATION = None
DIRECTOR = None
[docs]
def get_simulation():
global SIMULATION, DIRECTOR
if SIMULATION is None:
logger.info("Initializing Advect Engine & Director...")
SIMULATION = AdvectEngine(num_agents=10_000)
DIRECTOR = ScenarioDirector(SIMULATION)
return SIMULATION, DIRECTOR
# --- NEXUS: Event Bus (JSON) ---
[docs]
class ConnectionManager:
def __init__(self):
self.active_connections: list[WebSocket] = []
[docs]
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
[docs]
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
[docs]
async def broadcast(self, message: dict):
# Clean out dead connections implicitly handled by send failing?
# Better to iterate safely.
to_remove = []
for connection in self.active_connections:
try:
await connection.send_json(message)
except:
to_remove.append(connection)
for c in to_remove:
if c in self.active_connections:
self.active_connections.remove(c)
nexus_manager = ConnectionManager()
[docs]
@app.websocket("/ws/nexus")
async def nexus_endpoint(websocket: WebSocket):
"""
The Nexus: High-level command & control.
Bi-directional JSON events.
"""
sim, director = get_simulation()
await nexus_manager.connect(websocket)
try:
while True:
data = await websocket.receive_json()
# Handle Commands
if data.get("action") == "start_scenario":
scenario_id = data.get("id")
if scenario_id == "shockwave":
director.run_shockwave()
elif scenario_id == "green_wave":
director.run_green_wave()
elif scenario_id == "dark_data":
director.run_dark_data()
# Acknowledge
await nexus_manager.broadcast({
"type": "scenario_started",
"id": scenario_id,
"timestamp": time.time()
})
except WebSocketDisconnect:
nexus_manager.disconnect(websocket)
except Exception as e:
logger.error(f"Nexus error: {e}")
nexus_manager.disconnect(websocket)
# --- ADVECT: Firehose (Binary) ---
[docs]
@app.websocket("/ws/advect")
async def advect_endpoint(websocket: WebSocket):
"""
The Firehose: Unidirectional Physics Stream.
Raw Binary ~60Hz. Zero-Copy from Warp.
"""
await websocket.accept()
sim, _ = get_simulation()
try:
while True:
start_time = time.perf_counter()
# 1. Step Physics
# Returns raw bytes of (N, 3) float32 array [x, y, state]
position_bytes = sim.tick(dt=1.0/60.0)
# 2. Header (16 Bytes now?)
# [Status (u32), AgentCount (u32), GridLoad (f32), FPS_Target (f32)?]
# Old was: status, count, grid_load
status = 1
count = sim.num_agents
grid_load = 0.85
header = struct.pack('<IIf', status, count, grid_load)
payload = header + position_bytes
# 3. Send
await websocket.send_bytes(payload)
# 4. 60Hz Pacing
elapsed = time.perf_counter() - start_time
sleep_time = max(0.0, (1.0/60.0) - elapsed)
if sleep_time > 0:
await asyncio.sleep(sleep_time)
except WebSocketDisconnect:
logger.info("Advect client disconnected")
except Exception as e:
logger.error(f"Advect error: {e}")