"""Spatial analysis module for MRT accessibility scoring.
This module provides geospatial analysis capabilities for computing MRT accessibility
scores using KDTree-based nearest-neighbor queries.
"""
from __future__ import annotations
import contextlib
import hashlib
import json
import logging
import os
import pickle
import numpy as np
import pandas as pd
try:
from sklearn.neighbors import KDTree
except Exception: # noqa: BLE001
KDTree = None
[docs]
class TransportScorer:
"""Compute MRT accessibility scores using spatial nearest-neighbor queries.
This scorer loads a catalog of MRT station coordinates, strictly excluding
all LRT stations using a regex filter '^(BP|S[WE]|P[WE])'. The pattern
matches the line codes for Bukit Panjang (BP), Sengkang (SW/SE), and Punggol
(PW/PE) LRT loops, ensuring that only heavy rail stations are retained.
A KDTree (from scikit-learn) is used for vectorized nearest-neighbor
computation across thousands of records instantly, avoiding Python loops.
Accessibility score definition
------------------------------
score = max(0, 10 - (dist_km * 2))
where dist_km is the Euclidean distance in kilometers from the HDB listing
coordinate to the nearest MRT station in the filtered catalog.
"""
def __init__(
self, stations_df: pd.DataFrame | None = None, cache_dir: str | None = None
) -> None:
self.logger = logging.getLogger(self.__class__.__name__)
self._stations: pd.DataFrame | None = None
self._tree: KDTree | None = None
self._cache_dir = cache_dir or os.path.join(os.getcwd(), ".cache_transport")
os.makedirs(self._cache_dir, exist_ok=True)
if self.logger.isEnabledFor(logging.DEBUG):
self.logger.debug(
"TransportScorer initialized with cache_dir=%s", self._cache_dir
)
if stations_df is not None:
self.load_stations(stations_df)
[docs]
@staticmethod
def _exclude_lrt(df: pd.DataFrame) -> pd.DataFrame:
"""Exclude LRT stations using strict regex on line codes.
Excludes station rows whose `line_code` matches '^(BP|S[WE]|P[WE])'.
Column expectations:
- name: station name (str)
- line_code: string line code such as 'NS', 'EW', 'DT', 'CC', 'BP', 'SW'
- lat, lon: numeric coordinates in degrees
"""
if "line_code" not in df.columns:
return df
mask = ~df["line_code"].astype(str).str.match(r"^(BP|S[WE]|P[WE])", na=False)
return df.loc[mask].copy()
[docs]
def _cache_paths(self, tag: str) -> tuple[str, str]:
key = hashlib.sha256(tag.encode("utf-8")).hexdigest()[:16]
return (
os.path.join(self._cache_dir, f"stations_{key}.pkl"),
os.path.join(self._cache_dir, f"kdtree_{key}.pkl"),
)
[docs]
def clear_cache(self) -> None:
"""Delete cached stations and KDTree files in the cache directory."""
try:
import glob
for p in glob.glob(os.path.join(self._cache_dir, "*.pkl")):
with contextlib.suppress(Exception):
os.remove(p)
if self.logger.isEnabledFor(logging.INFO):
self.logger.info("Cleared transport cache at %s", self._cache_dir)
except Exception:
pass
[docs]
def _try_load_cache(self, tag: str) -> bool:
if self.logger.isEnabledFor(logging.DEBUG):
self.logger.debug(
"Transport cache lookup tag=%s dir=%s", tag, self._cache_dir
)
stations_pkl, tree_pkl = self._cache_paths(tag)
try:
with open(stations_pkl, "rb") as f:
self._stations = pickle.load(f) # nosec B301
with open(tree_pkl, "rb") as f:
self._tree = pickle.load(f) # nosec B301
self.logger.info("Transport cache HIT: %s", tag)
return True
except Exception:
self.logger.info("Transport cache MISS: %s", tag)
return False
[docs]
def _save_cache(self, tag: str) -> None:
if self._stations is None or self._tree is None:
return
stations_pkl, tree_pkl = self._cache_paths(tag)
try:
with open(stations_pkl, "wb") as f:
pickle.dump(self._stations, f)
with open(tree_pkl, "wb") as f:
pickle.dump(self._tree, f)
self.logger.info("Saved transport cache for %s", tag)
except Exception:
pass
[docs]
def load_stations(self, stations_df: pd.DataFrame) -> None:
"""Load station catalog, exclude LRT, and build KDTree index.
Parameters
----------
stations_df : pd.DataFrame
DataFrame with columns: ['name', 'line_code', 'lat', 'lon'].
"""
if KDTree is None:
raise ImportError(
"scikit-learn not available. Install scikit-learn to use TransportScorer."
)
required = ["name", "lat", "lon"]
missing = [c for c in required if c not in stations_df.columns]
if missing:
raise ValueError(f"Stations catalog missing columns: {missing}")
df = stations_df.copy()
# Normalize columns
df.columns = pd.Index(
[str(c).strip().lower().replace(" ", "_") for c in df.columns]
)
# Try cache based on a stable tag (hash of station rows)
# Get hash of DataFrame - handle both ndarray and ExtensionArray
hash_values = pd.util.hash_pandas_object(df, index=True).values
if hasattr(hash_values, "tobytes"):
hash_bytes = hash_values.tobytes()
else:
# ExtensionArray - convert to numpy first
hash_bytes = hash_values.to_numpy().tobytes()
tag = "csv:" + hashlib.sha256(hash_bytes).hexdigest()[:16]
if self._try_load_cache(tag):
return
if "line_code" in df.columns:
df["line_code"] = df["line_code"].astype(str).str.upper()
# Fallback: if line_code missing, try to infer non-LRT by name not containing 'LRT'
df = self._exclude_lrt(df)
if "line_code" not in df.columns:
df = df[
~df.get("name", pd.Series(""))
.astype(str)
.str.contains("LRT", case=False, na=False)
]
df = df.dropna(subset=["lat", "lon"]).reset_index(drop=True)
if df.empty:
raise ValueError("No MRT stations available after LRT exclusion.")
coords = np.deg2rad(df[["lat", "lon"]].to_numpy(dtype=float))
# Use haversine-compatible KDTree by projecting to radians and custom metric
self._tree = KDTree(coords, metric="euclidean")
# TODO(research): Replace Euclidean distance with a Manhattan Distance heuristic that accounts for Singapore's road topology.
# In practice, first/last-mile access is constrained by the street graph; a city-block (L1) proxy or learned graph
# impedance model can better approximate walking accessibility than straight-line embeddings.
self._stations = df
self._save_cache(tag)
self.logger.info("Loaded %d MRT stations (LRT excluded)", len(df))
[docs]
def load_stations_geojson(self, path: str) -> None:
"""Load MRT stations from an LTA Exit GeoJSON file and build KDTree.
The GeoJSON is expected to be a FeatureCollection where each feature is
a station exit with properties containing station information. This
loader will:
- Extract station name and line code from common property keys.
- Preserve robust fallback logic for station name parsing across GeoJSON variants
(STATION_NA / STN_NAME / STN_NAM / NAME / etc.).
- Strictly exclude LRT using the regex '^(BP|S[WE]|P[WE])' on line codes when available,
and additionally filter out any stations with 'LRT' in the name as a safety fallback.
- Build a KDTree over exit coordinates (lon, lat). Using exits provides accurate
pedestrian access points for distance calculations.
Parameters
----------
path : str
Path to the GeoJSON file.
"""
if KDTree is None:
raise ImportError(
"scikit-learn not available. Install scikit-learn to use TransportScorer."
)
# Cache by file mtime and size for stability
try:
stat = os.stat(path)
tag = (
f"geojson:{os.path.basename(path)}:{int(stat.st_mtime)}:{stat.st_size}"
)
if self._try_load_cache(tag):
return
except Exception:
tag = f"geojson:{os.path.basename(path)}"
try:
with open(path, encoding="utf-8") as f:
gj = json.load(f)
except FileNotFoundError:
raise
except Exception as exc: # noqa: BLE001
self.logger.exception("Failed to read GeoJSON: %s", path)
raise ValueError(f"Failed to read GeoJSON: {path}") from exc
features = gj.get("features", []) if isinstance(gj, dict) else []
rows: list[tuple[str, str | None, float, float]] = []
for feat in features:
try:
geom = feat.get("geometry", {})
if not geom or geom.get("type") != "Point":
continue
coords = geom.get("coordinates")
if not coords or len(coords) < 2:
continue
lon, lat = float(coords[0]), float(coords[1])
props = feat.get("properties", {}) or {}
# CRITICAL: Candidate keys for station name - preserving robust fallback
name = (
props.get("STATION_NA")
or props.get("STN_NAME")
or props.get("STN_NAM")
or props.get("NAME")
or props.get("StationName")
or props.get("stn_name")
or props.get("station")
)
line_code = (
props.get("LINE")
or props.get("LINE_CODE")
or props.get("LINE_N")
or props.get("MRT_LINE")
or props.get("railLine")
or props.get("line")
)
if name is None:
# Some datasets store station at 'STN_NAM' with exit codes separate
continue
name_str = str(name).strip()
lc_str = (
str(line_code).strip().upper() if line_code is not None else None
)
rows.append((name_str, lc_str, lat, lon))
except Exception: # noqa: BLE001
continue
if not rows:
raise ValueError("No station exits parsed from GeoJSON.")
df = pd.DataFrame(rows, columns=["name", "line_code", "lat", "lon"])
# Exclude LRT by line_code regex when available; also filter names containing 'LRT'
df = self._exclude_lrt(df)
df = df[~df["name"].astype(str).str.contains("LRT", case=False, na=False)]
df = df.dropna(subset=["lat", "lon"]).reset_index(drop=True)
if df.empty:
raise ValueError("No MRT stations available after parsing/exclusion.")
coords = np.deg2rad(df[["lat", "lon"]].to_numpy(dtype=float))
self._tree = KDTree(coords, metric="euclidean")
# TODO(research): Replace Euclidean distance with a Manhattan Distance heuristic that accounts for Singapore's road topology.
# This GeoJSON path uses station exits (good), but the impedance is still straight-line in radian space.
# Next step: calibrate a fast proxy (L1/anisotropic metric) against true walking times (e.g., OSRM/OneMap).
self._stations = df
self._save_cache(tag)
self.logger.info("Loaded %d MRT exits from GeoJSON (LRT excluded)", len(df))
[docs]
@staticmethod
def _haversine_meters(
latlon1: np.ndarray[tuple[int, int], np.dtype[np.float64]],
latlon2: np.ndarray[tuple[int, int], np.dtype[np.float64]],
) -> np.ndarray[tuple[int], np.dtype[np.float64]]:
"""Compute haversine distance in meters between arrays of points.
Parameters
----------
latlon1 : np.ndarray
Array of shape (n, 2) with columns [lat_rad, lon_rad] in radians.
latlon2 : np.ndarray
Array of shape (n, 2) with columns [lat_rad, lon_rad] in radians.
"""
earth_radius_m = 6371000.0 # meters
dlat = latlon2[:, 0] - latlon1[:, 0]
dlon = latlon2[:, 1] - latlon1[:, 1]
a = (
np.sin(dlat / 2.0) ** 2
+ np.cos(latlon1[:, 0]) * np.cos(latlon2[:, 0]) * np.sin(dlon / 2.0) ** 2
)
c = 2 * np.arctan2(np.sqrt(a), np.sqrt(1 - a))
return earth_radius_m * c # type: ignore[no-any-return]
[docs]
def calculate_accessibility_score(self, df: pd.DataFrame) -> pd.DataFrame:
"""Annotate DataFrame with nearest MRT and accessibility score.
Adds columns:
- Nearest_MRT: name of nearest heavy-rail MRT station
- Dist_m: distance to nearest station in meters
- Accessibility_Score: score = max(0, 10 - (dist_km * 2))
Expectations: Input `df` must have 'lat' and 'lon' columns (degrees).
"""
if self._tree is None or self._stations is None:
raise RuntimeError(
"TransportScorer not initialized. Call load_stations first."
)
if not {"lat", "lon"}.issubset(set(df.columns)):
# No-op if coords not present; add NaNs
out = df.copy()
out["Nearest_MRT"] = np.nan
out["Dist_m"] = np.nan
out["Accessibility_Score"] = np.nan
return out
pts = np.deg2rad(df[["lat", "lon"]].to_numpy(dtype=float))
# KDTree query (k=1)
dist_rad, ind = self._tree.query(pts, k=1)
# Convert chord distance in radian-space to meters using haversine to nearest
nearest_rad = self._stations[["lat", "lon"]].to_numpy(dtype=float)
nearest_rad = np.deg2rad(nearest_rad[ind.flatten()])
dist_m = self._haversine_meters(pts, nearest_rad)
# Score: max(0, 10 - 2 * km)
# TODO(limitations): Currently treats distance as a linear proxy for access cost and implicitly assumes constant walking speed.
# Future versions should implement variable speeds based on agent demographics (e.g., elderly vs. youth) and street-network
# impedance (slopes, crossings, sheltered paths), ideally calibrated from observed travel-time datasets.
dist_km = dist_m / 1000.0
score = np.maximum(0.0, 10.0 - (dist_km * 2.0))
out = df.copy()
out["Nearest_MRT"] = self._stations.loc[ind.flatten(), "name"].to_numpy()
out["Dist_m"] = dist_m
out["Accessibility_Score"] = score
return out