from __future__ import annotations
import math
import os
from dataclasses import dataclass
from pathlib import Path
import numpy as np
import pandas as pd
import requests
BBox = tuple[float, float, float, float]
[docs]
def _default_data_root() -> Path:
return Path("~/.unsprawl/data").expanduser()
[docs]
@dataclass(frozen=True)
class SFRaster:
lon: np.ndarray[tuple[int, int], np.dtype[np.float32]] # shape (H, W)
lat: np.ndarray[tuple[int, int], np.dtype[np.float32]] # shape (H, W)
u: np.ndarray[tuple[int, int], np.dtype[np.float32]] # m/s eastward, shape (H, W)
v: np.ndarray[tuple[int, int], np.dtype[np.float32]] # m/s northward, shape (H, W)
[docs]
@dataclass(frozen=True)
class DataSFProvider:
"""Provider for San Francisco mobility vector fields (network-first, synthetic
fallback).
No token required. If DATASF_TOKEN is set, it will be used, but we always fall back
to a no-token public path with rate-limit awareness.
"""
data_root: Path = _default_data_root()
@property
def cache_dir(self) -> Path:
return self.data_root / "us" / "ca" / "sf" / "mobility"
[docs]
def _ensure_cache(self) -> None:
self.cache_dir.mkdir(parents=True, exist_ok=True)
[docs]
def fetch_road_vectors(self, *, force: bool = False) -> pd.DataFrame:
"""Fetch simplified road vectors with speed estimates.
Returns columns: lon1, lat1, lon2, lat2, speed_mps
Network-first; synthetic fallback on error.
"""
self._ensure_cache()
cache = self.cache_dir / "road_vectors.parquet"
if cache.exists() and not force:
try:
return pd.read_parquet(cache)
except Exception:
pass
# Attempt network fetch (placeholder endpoint; real implementation can be added here)
_token = os.getenv("DATASF_TOKEN") # Reserved for future use
try:
# Minimal synthetic sample if no public endpoint; we still try a ping to datasf
url = "https://data.sfgov.org/api/odata2/" # cheap HEAD-friendly URL
_ = requests.get(url, timeout=5)
except Exception:
pass
df = self._synthetic_roads()
try:
df.to_parquet(cache, index=False)
except Exception:
df.to_csv(cache.with_suffix(".csv"), index=False)
return df
[docs]
def _synthetic_roads(self) -> pd.DataFrame:
# Create a small synthetic grid of vectors centered in SF
rng = np.random.default_rng(0)
lon_min, lon_max = -122.53, -122.35
lat_min, lat_max = 37.70, 37.83
xs = np.linspace(lon_min, lon_max, 20)
ys = np.linspace(lat_min, lat_max, 16)
rows = []
for i in range(len(xs) - 1):
for j in range(len(ys) - 1):
lon1 = xs[i]
lon2 = xs[i + 1]
lat1 = ys[j]
lat2 = ys[j]
speed = float(5.0 + 2.0 * rng.random()) # 5-7 m/s
rows.append((lon1, lat1, lon2, lat2, speed))
return pd.DataFrame(rows, columns=["lon1", "lat1", "lon2", "lat2", "speed_mps"])
[docs]
def rasterize(self, bbox: BBox, res: int = 128) -> SFRaster:
"""Rasterize road vectors into an Eulerian grid over bbox.
We aggregate segment directions into a (u,v) field using simple binning.
"""
lon_min, lat_min, lon_max, lat_max = bbox
height = width = int(res)
lon = np.linspace(lon_min, lon_max, width)
lat = np.linspace(lat_min, lat_max, height)
lon_grid, lat_grid = np.meshgrid(lon, lat)
u_grid = np.zeros_like(lon_grid, dtype=np.float32)
v_grid = np.zeros_like(lat_grid, dtype=np.float32)
df = self.fetch_road_vectors()
# Bin each segment midpoint to nearest cell and add its direction*speed
if not df.empty:
cx = ((df["lon1"] + df["lon2"]) * 0.5).to_numpy()
cy = ((df["lat1"] + df["lat2"]) * 0.5).to_numpy()
dx = (df["lon2"] - df["lon1"]).to_numpy()
dy = (df["lat2"] - df["lat1"]).to_numpy()
speed = df["speed_mps"].to_numpy(dtype=np.float32)
# Normalize direction in lon/lat degrees, then convert to m/s approximately
norm = np.hypot(dx, dy)
norm[norm == 0] = 1.0
dirx = dx / norm
diry = dy / norm
# Rough meters per degree at SF latitude
m_per_deg_lat = 111_320.0
m_per_deg_lon = 111_320.0 * math.cos(math.radians(37.77))
ux = dirx * speed / m_per_deg_lon # deg/s
vy = diry * speed / m_per_deg_lat # deg/s
# Map to grid indices
ix = np.clip(
((cx - lon_min) / (lon_max - lon_min) * (width - 1)).astype(int),
0,
width - 1,
)
iy = np.clip(
((cy - lat_min) / (lat_max - lat_min) * (height - 1)).astype(int),
0,
height - 1,
)
for x, y, u, v in zip(ix, iy, ux, vy, strict=False):
u_grid[y, x] += np.float32(u)
v_grid[y, x] += np.float32(v)
return SFRaster(
lon=lon_grid.astype(np.float32),
lat=lat_grid.astype(np.float32),
u=u_grid,
v=v_grid,
)