Source code for unsprawl.api

import struct
import asyncio
import logging
import time
import json
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from unsprawl.simulation.advect import AdvectEngine
from unsprawl.scenarios import ScenarioDirector

# Initialize Core
app = FastAPI()
logger = logging.getLogger("unsprawl.api")
logging.basicConfig(level=logging.INFO)

# Global Simulation & Director (Singleton)
SIMULATION = None
DIRECTOR = None

[docs] def get_simulation(): global SIMULATION, DIRECTOR if SIMULATION is None: logger.info("Initializing Advect Engine & Director...") SIMULATION = AdvectEngine(num_agents=10_000) DIRECTOR = ScenarioDirector(SIMULATION) return SIMULATION, DIRECTOR
# --- NEXUS: Event Bus (JSON) ---
[docs] class ConnectionManager: def __init__(self): self.active_connections: list[WebSocket] = []
[docs] async def connect(self, websocket: WebSocket): await websocket.accept() self.active_connections.append(websocket)
[docs] def disconnect(self, websocket: WebSocket): self.active_connections.remove(websocket)
[docs] async def broadcast(self, message: dict): # Clean out dead connections implicitly handled by send failing? # Better to iterate safely. to_remove = [] for connection in self.active_connections: try: await connection.send_json(message) except: to_remove.append(connection) for c in to_remove: if c in self.active_connections: self.active_connections.remove(c)
nexus_manager = ConnectionManager()
[docs] @app.websocket("/ws/nexus") async def nexus_endpoint(websocket: WebSocket): """ The Nexus: High-level command & control. Bi-directional JSON events. """ sim, director = get_simulation() await nexus_manager.connect(websocket) try: while True: data = await websocket.receive_json() # Handle Commands if data.get("action") == "start_scenario": scenario_id = data.get("id") if scenario_id == "shockwave": director.run_shockwave() elif scenario_id == "green_wave": director.run_green_wave() elif scenario_id == "dark_data": director.run_dark_data() # Acknowledge await nexus_manager.broadcast({ "type": "scenario_started", "id": scenario_id, "timestamp": time.time() }) except WebSocketDisconnect: nexus_manager.disconnect(websocket) except Exception as e: logger.error(f"Nexus error: {e}") nexus_manager.disconnect(websocket)
# --- ADVECT: Firehose (Binary) ---
[docs] @app.websocket("/ws/advect") async def advect_endpoint(websocket: WebSocket): """ The Firehose: Unidirectional Physics Stream. Raw Binary ~60Hz. Zero-Copy from Warp. """ await websocket.accept() sim, _ = get_simulation() try: while True: start_time = time.perf_counter() # 1. Step Physics # Returns raw bytes of (N, 3) float32 array [x, y, state] position_bytes = sim.tick(dt=1.0/60.0) # 2. Header (16 Bytes now?) # [Status (u32), AgentCount (u32), GridLoad (f32), FPS_Target (f32)?] # Old was: status, count, grid_load status = 1 count = sim.num_agents grid_load = 0.85 header = struct.pack('<IIf', status, count, grid_load) payload = header + position_bytes # 3. Send await websocket.send_bytes(payload) # 4. 60Hz Pacing elapsed = time.perf_counter() - start_time sleep_time = max(0.0, (1.0/60.0) - elapsed) if sleep_time > 0: await asyncio.sleep(sleep_time) except WebSocketDisconnect: logger.info("Advect client disconnected") except Exception as e: logger.error(f"Advect error: {e}")