import warp as wp
import numpy as np
import logging
logger = logging.getLogger("unsprawl.scenarios")
[docs]
class ScenarioDirector:
def __init__(self, engine):
self.engine = engine
[docs]
def run_shockwave(self):
"""
Scenario 1: Fluid Dynamics
Spawns 5000 agents on a 3-lane highway. Force lead car to brake.
"""
logger.info("ACTION: Running Scenario 'SHOCKWAVE'")
# 1. Reset Logic (Simplified: Just overwrite positions)
num_agents = 5000
if num_agents > self.engine.num_agents:
logger.warning(f"Scenario requires {num_agents}, but engine only has {self.engine.num_agents}")
num_agents = self.engine.num_agents
# 2. Spawn on 3 lanes
# Map is 5000x5000. Let's use x=1000 to x=4000, y=2500 (middle)
# Lanes at y=2490, 2500, 2510
rng = np.random.default_rng(42)
new_pos = np.zeros((self.engine.num_agents, 3), dtype=np.float32)
new_vel = np.zeros((self.engine.num_agents, 2), dtype=np.float32)
# Park unused agents off-screen or valid
# Let's just randomize them lightly so they aren't all at 0,0
new_pos[:, 0] = rng.uniform(0, 5000, self.engine.num_agents)
new_pos[:, 1] = rng.uniform(0, 5000, self.engine.num_agents)
# Set specific agents for the highway
agents_per_lane = num_agents // 3
for lane_idx in range(3):
y_offset = (lane_idx - 1) * 10.0 # -10, 0, 10 relative to center
y_pos = 2500.0 + y_offset
# Start index for this lane
start_idx = lane_idx * agents_per_lane
end_idx = start_idx + agents_per_lane
# Distribute along X
# spacing = 3000m / agents
# High density
x_positions = np.linspace(1000, 4000, agents_per_lane).astype(np.float32)
new_pos[start_idx:end_idx, 0] = x_positions
new_pos[start_idx:end_idx, 1] = y_pos
new_pos[start_idx:end_idx, 2] = 0.0 # Normal State
# Velocity: Move East
new_vel[start_idx:end_idx, 0] = 25.0 # 25 m/s (~90km/h)
new_vel[start_idx:end_idx, 1] = 0.0
# FORCE BRAKE on Leading Car of Middle Lane to cause shockwave
if lane_idx == 1:
# The last agent in the list is the lead car (highest X)
lead_idx = end_idx - 1
new_vel[lead_idx, 0] = 0.0
new_pos[lead_idx, 2] = 1.0 # Braking State
# Make a few behind it slow too to start the wave
new_vel[lead_idx-5:lead_idx, 0] = 5.0
new_pos[lead_idx-5:lead_idx, 2] = 1.0
# DATA UPLOAD
# "Direct Memory Access" using wp.from_numpy to overwrite existing buffer
self.engine.pos_buffer = wp.from_numpy(new_pos, dtype=wp.vec3, device=self.engine.device)
self.engine.vel_buffer = wp.from_numpy(new_vel, dtype=wp.vec2, device=self.engine.device)
logger.info("SCENARIO: Shockwave Initiated.")
[docs]
def run_green_wave(self):
"""
Scenario 2: The Green Wave (Resilience)
High density gridlock + 1 Emergency Agent (Cyan).
"""
logger.info("ACTION: Running Scenario 'GREEN WAVE'")
num_agents = self.engine.num_agents
rng = np.random.default_rng(99)
# 1. Spawn Gridlock (High Density Random)
new_pos = np.zeros((num_agents, 3), dtype=np.float32)
new_vel = np.zeros((num_agents, 2), dtype=np.float32)
# Grid block: 2000-3000 x 2000-3000
new_pos[:, 0] = rng.uniform(2000, 3000, num_agents)
new_pos[:, 1] = rng.uniform(2000, 3000, num_agents)
new_pos[:, 2] = 0.0 # Normal
# Low velocity (Gridlock)
new_vel[:, :] = (rng.random((num_agents, 2)) - 0.5) * 1.0
# 2. Emergency Agent
# ID 99999 (or just last index)
# Position it at start of grid
emergency_idx = num_agents - 1
new_pos[emergency_idx] = [1800.0, 2500.0, 2.0] # State 2.0 = Emergency (Cyan)
new_vel[emergency_idx] = [30.0, 0.0] # High speed
# Clear path in front of it (Pseudo-Green Wave visual)
# Agents directly in front get moved aside slightly or frozen
# For visual proof, we just color them "Yielding" maybe? Or just rely on the gap.
# Let's just set the state for now.
self.engine.pos_buffer = wp.from_numpy(new_pos, dtype=wp.vec3, device=self.engine.device)
self.engine.vel_buffer = wp.from_numpy(new_vel, dtype=wp.vec2, device=self.engine.device)
logger.info("SCENARIO: Green Wave Initiated.")
[docs]
def run_dark_data(self):
"""
Scenario 3: Dark Data (Refinery)
Rerouting around a closed node.
"""
logger.info("ACTION: Running Scenario 'DARK DATA'")
# For this visual proof, we simulate a "river" of agents hitting a block
num_agents = 5000
cols = 50
rows = 100
new_pos = np.zeros((self.engine.num_agents, 3), dtype=np.float32)
new_vel = np.zeros((self.engine.num_agents, 2), dtype=np.float32)
# Stream of agents
for i in range(num_agents):
row = i // cols
col = i % cols
x = 2000.0 + col * 10
y = 1000.0 + row * 10
new_pos[i] = [x, y, 0.0]
new_vel[i] = [0.0, 20.0] # Moving North
# The Block (Mocking a closed road at y=2000)
# Agents approaching y=2000 need to divert
# This logic would typically be in the kernel (collision).
# For "Action", we just set up the initial state and maybe update the 'map state'
# But here we just reset positions to start the flow.
self.engine.pos_buffer = wp.from_numpy(new_pos, dtype=wp.vec3, device=self.engine.device)
self.engine.vel_buffer = wp.from_numpy(new_vel, dtype=wp.vec2, device=self.engine.device)
logger.info("SCENARIO: Dark Data Initiated.")