Source code for unsprawl.providers.data.us.ca.sf.datasf_provider

from __future__ import annotations

import math
import os
from dataclasses import dataclass
from pathlib import Path

import numpy as np
import pandas as pd
import requests

BBox = tuple[float, float, float, float]


[docs] def _default_data_root() -> Path: return Path("~/.unsprawl/data").expanduser()
[docs] @dataclass(frozen=True) class SFRaster: lon: np.ndarray[tuple[int, int], np.dtype[np.float32]] # shape (H, W) lat: np.ndarray[tuple[int, int], np.dtype[np.float32]] # shape (H, W) u: np.ndarray[tuple[int, int], np.dtype[np.float32]] # m/s eastward, shape (H, W) v: np.ndarray[tuple[int, int], np.dtype[np.float32]] # m/s northward, shape (H, W)
[docs] @dataclass(frozen=True) class DataSFProvider: """Provider for San Francisco mobility vector fields (network-first, synthetic fallback). No token required. If DATASF_TOKEN is set, it will be used, but we always fall back to a no-token public path with rate-limit awareness. """ data_root: Path = _default_data_root() @property def cache_dir(self) -> Path: return self.data_root / "us" / "ca" / "sf" / "mobility"
[docs] def _ensure_cache(self) -> None: self.cache_dir.mkdir(parents=True, exist_ok=True)
[docs] def fetch_road_vectors(self, *, force: bool = False) -> pd.DataFrame: """Fetch simplified road vectors with speed estimates. Returns columns: lon1, lat1, lon2, lat2, speed_mps Network-first; synthetic fallback on error. """ self._ensure_cache() cache = self.cache_dir / "road_vectors.parquet" if cache.exists() and not force: try: return pd.read_parquet(cache) except Exception: pass # Attempt network fetch (placeholder endpoint; real implementation can be added here) _token = os.getenv("DATASF_TOKEN") # Reserved for future use try: # Minimal synthetic sample if no public endpoint; we still try a ping to datasf url = "https://data.sfgov.org/api/odata2/" # cheap HEAD-friendly URL _ = requests.get(url, timeout=5) except Exception: pass df = self._synthetic_roads() try: df.to_parquet(cache, index=False) except Exception: df.to_csv(cache.with_suffix(".csv"), index=False) return df
[docs] def _synthetic_roads(self) -> pd.DataFrame: # Create a small synthetic grid of vectors centered in SF rng = np.random.default_rng(0) lon_min, lon_max = -122.53, -122.35 lat_min, lat_max = 37.70, 37.83 xs = np.linspace(lon_min, lon_max, 20) ys = np.linspace(lat_min, lat_max, 16) rows = [] for i in range(len(xs) - 1): for j in range(len(ys) - 1): lon1 = xs[i] lon2 = xs[i + 1] lat1 = ys[j] lat2 = ys[j] speed = float(5.0 + 2.0 * rng.random()) # 5-7 m/s rows.append((lon1, lat1, lon2, lat2, speed)) return pd.DataFrame(rows, columns=["lon1", "lat1", "lon2", "lat2", "speed_mps"])
[docs] def rasterize(self, bbox: BBox, res: int = 128) -> SFRaster: """Rasterize road vectors into an Eulerian grid over bbox. We aggregate segment directions into a (u,v) field using simple binning. """ lon_min, lat_min, lon_max, lat_max = bbox height = width = int(res) lon = np.linspace(lon_min, lon_max, width) lat = np.linspace(lat_min, lat_max, height) lon_grid, lat_grid = np.meshgrid(lon, lat) u_grid = np.zeros_like(lon_grid, dtype=np.float32) v_grid = np.zeros_like(lat_grid, dtype=np.float32) df = self.fetch_road_vectors() # Bin each segment midpoint to nearest cell and add its direction*speed if not df.empty: cx = ((df["lon1"] + df["lon2"]) * 0.5).to_numpy() cy = ((df["lat1"] + df["lat2"]) * 0.5).to_numpy() dx = (df["lon2"] - df["lon1"]).to_numpy() dy = (df["lat2"] - df["lat1"]).to_numpy() speed = df["speed_mps"].to_numpy(dtype=np.float32) # Normalize direction in lon/lat degrees, then convert to m/s approximately norm = np.hypot(dx, dy) norm[norm == 0] = 1.0 dirx = dx / norm diry = dy / norm # Rough meters per degree at SF latitude m_per_deg_lat = 111_320.0 m_per_deg_lon = 111_320.0 * math.cos(math.radians(37.77)) ux = dirx * speed / m_per_deg_lon # deg/s vy = diry * speed / m_per_deg_lat # deg/s # Map to grid indices ix = np.clip( ((cx - lon_min) / (lon_max - lon_min) * (width - 1)).astype(int), 0, width - 1, ) iy = np.clip( ((cy - lat_min) / (lat_max - lat_min) * (height - 1)).astype(int), 0, height - 1, ) for x, y, u, v in zip(ix, iy, ux, vy, strict=False): u_grid[y, x] += np.float32(u) v_grid[y, x] += np.float32(v) return SFRaster( lon=lon_grid.astype(np.float32), lat=lat_grid.astype(np.float32), u=u_grid, v=v_grid, )