Source code for unsprawl.scenarios

import warp as wp
import numpy as np
import logging

logger = logging.getLogger("unsprawl.scenarios")

[docs] class ScenarioDirector: def __init__(self, engine): self.engine = engine
[docs] def run_shockwave(self): """ Scenario 1: Fluid Dynamics Spawns 5000 agents on a 3-lane highway. Force lead car to brake. """ logger.info("ACTION: Running Scenario 'SHOCKWAVE'") # 1. Reset Logic (Simplified: Just overwrite positions) num_agents = 5000 if num_agents > self.engine.num_agents: logger.warning(f"Scenario requires {num_agents}, but engine only has {self.engine.num_agents}") num_agents = self.engine.num_agents # 2. Spawn on 3 lanes # Map is 5000x5000. Let's use x=1000 to x=4000, y=2500 (middle) # Lanes at y=2490, 2500, 2510 rng = np.random.default_rng(42) new_pos = np.zeros((self.engine.num_agents, 3), dtype=np.float32) new_vel = np.zeros((self.engine.num_agents, 2), dtype=np.float32) # Park unused agents off-screen or valid # Let's just randomize them lightly so they aren't all at 0,0 new_pos[:, 0] = rng.uniform(0, 5000, self.engine.num_agents) new_pos[:, 1] = rng.uniform(0, 5000, self.engine.num_agents) # Set specific agents for the highway agents_per_lane = num_agents // 3 for lane_idx in range(3): y_offset = (lane_idx - 1) * 10.0 # -10, 0, 10 relative to center y_pos = 2500.0 + y_offset # Start index for this lane start_idx = lane_idx * agents_per_lane end_idx = start_idx + agents_per_lane # Distribute along X # spacing = 3000m / agents # High density x_positions = np.linspace(1000, 4000, agents_per_lane).astype(np.float32) new_pos[start_idx:end_idx, 0] = x_positions new_pos[start_idx:end_idx, 1] = y_pos new_pos[start_idx:end_idx, 2] = 0.0 # Normal State # Velocity: Move East new_vel[start_idx:end_idx, 0] = 25.0 # 25 m/s (~90km/h) new_vel[start_idx:end_idx, 1] = 0.0 # FORCE BRAKE on Leading Car of Middle Lane to cause shockwave if lane_idx == 1: # The last agent in the list is the lead car (highest X) lead_idx = end_idx - 1 new_vel[lead_idx, 0] = 0.0 new_pos[lead_idx, 2] = 1.0 # Braking State # Make a few behind it slow too to start the wave new_vel[lead_idx-5:lead_idx, 0] = 5.0 new_pos[lead_idx-5:lead_idx, 2] = 1.0 # DATA UPLOAD # "Direct Memory Access" using wp.from_numpy to overwrite existing buffer self.engine.pos_buffer = wp.from_numpy(new_pos, dtype=wp.vec3, device=self.engine.device) self.engine.vel_buffer = wp.from_numpy(new_vel, dtype=wp.vec2, device=self.engine.device) logger.info("SCENARIO: Shockwave Initiated.")
[docs] def run_green_wave(self): """ Scenario 2: The Green Wave (Resilience) High density gridlock + 1 Emergency Agent (Cyan). """ logger.info("ACTION: Running Scenario 'GREEN WAVE'") num_agents = self.engine.num_agents rng = np.random.default_rng(99) # 1. Spawn Gridlock (High Density Random) new_pos = np.zeros((num_agents, 3), dtype=np.float32) new_vel = np.zeros((num_agents, 2), dtype=np.float32) # Grid block: 2000-3000 x 2000-3000 new_pos[:, 0] = rng.uniform(2000, 3000, num_agents) new_pos[:, 1] = rng.uniform(2000, 3000, num_agents) new_pos[:, 2] = 0.0 # Normal # Low velocity (Gridlock) new_vel[:, :] = (rng.random((num_agents, 2)) - 0.5) * 1.0 # 2. Emergency Agent # ID 99999 (or just last index) # Position it at start of grid emergency_idx = num_agents - 1 new_pos[emergency_idx] = [1800.0, 2500.0, 2.0] # State 2.0 = Emergency (Cyan) new_vel[emergency_idx] = [30.0, 0.0] # High speed # Clear path in front of it (Pseudo-Green Wave visual) # Agents directly in front get moved aside slightly or frozen # For visual proof, we just color them "Yielding" maybe? Or just rely on the gap. # Let's just set the state for now. self.engine.pos_buffer = wp.from_numpy(new_pos, dtype=wp.vec3, device=self.engine.device) self.engine.vel_buffer = wp.from_numpy(new_vel, dtype=wp.vec2, device=self.engine.device) logger.info("SCENARIO: Green Wave Initiated.")
[docs] def run_dark_data(self): """ Scenario 3: Dark Data (Refinery) Rerouting around a closed node. """ logger.info("ACTION: Running Scenario 'DARK DATA'") # For this visual proof, we simulate a "river" of agents hitting a block num_agents = 5000 cols = 50 rows = 100 new_pos = np.zeros((self.engine.num_agents, 3), dtype=np.float32) new_vel = np.zeros((self.engine.num_agents, 2), dtype=np.float32) # Stream of agents for i in range(num_agents): row = i // cols col = i % cols x = 2000.0 + col * 10 y = 1000.0 + row * 10 new_pos[i] = [x, y, 0.0] new_vel[i] = [0.0, 20.0] # Moving North # The Block (Mocking a closed road at y=2000) # Agents approaching y=2000 need to divert # This logic would typically be in the kernel (collision). # For "Action", we just set up the initial state and maybe update the 'map state' # But here we just reset positions to start the flow. self.engine.pos_buffer = wp.from_numpy(new_pos, dtype=wp.vec3, device=self.engine.device) self.engine.vel_buffer = wp.from_numpy(new_vel, dtype=wp.vec2, device=self.engine.device) logger.info("SCENARIO: Dark Data Initiated.")