Source code for unsprawl.fetch

"""Data fetching utilities for downloading HDB and MRT datasets.

This module handles downloading datasets from Data.gov.sg APIs and provides fallback
synthetic data generation when official sources are unavailable.
"""

from __future__ import annotations

import csv
import json
import logging
import os
import random

import pandas as pd
import requests

# Official Dataset IDs (verified as of Dec 2025)
DATASET_IDS = {
    # "Resale flat prices based on registration date from Jan-2017 onwards"
    "hdb_resale": "d_8b84c4ee58e3cfc0ece0d773c8ca6abc",
    # "LTA MRT Station Exit" (User provided)
    "mrt_exits": "d_b39d3a0871985372d7e1637193335da5",
}

# Standard paths for datasets
DEFAULT_HDB_PATH = os.path.join(
    "ResaleFlatPrices",
    "Resale flat prices based on registration date from Jan-2017 onwards.csv",
)
DEFAULT_MRT_PATH = os.path.join(".data", "LTAMRTStationExitGEOJSON.geojson")

# Expected HDB resale columns for schema validation
EXPECTED_HDB_COLUMNS = {
    "month",
    "town",
    "flat_type",
    "block",
    "street_name",
    "storey_range",
    "floor_area_sqm",
    "flat_model",
    "lease_commence_date",
    "remaining_lease",
    "resale_price",
}

# Expected MRT columns (after normalization)
EXPECTED_MRT_COLUMNS = {"name", "lat", "lon"}


[docs] def validate_hdb_schema(path: str) -> bool: """Validate that a CSV file has the expected HDB resale schema. Parameters ---------- path : str Path to the CSV file to validate. Returns ------- bool True if the file exists and has the expected columns, False otherwise. """ if not os.path.exists(path): return False try: # Read just the header to check columns df = pd.read_csv(path, nrows=0) # Normalize column names cols = {str(c).strip().lower().replace(" ", "_") for c in df.columns} # Check if all expected columns are present return EXPECTED_HDB_COLUMNS.issubset(cols) except Exception: return False
[docs] def validate_mrt_schema(path: str) -> bool: """Validate that a GeoJSON or CSV file has the expected MRT schema. Parameters ---------- path : str Path to the GeoJSON or CSV file to validate. Returns ------- bool True if the file exists and has the expected structure, False otherwise. """ if not os.path.exists(path): return False try: if path.lower().endswith(".geojson") or path.lower().endswith(".json"): # Validate GeoJSON structure with open(path, encoding="utf-8") as f: data = json.load(f) if not isinstance(data, dict) or data.get("type") != "FeatureCollection": return False features = data.get("features", []) if not features: return False # Check that at least one feature has the expected structure for feat in features[:5]: # Check first 5 features if not isinstance(feat, dict): continue geom = feat.get("geometry", {}) if geom.get("type") != "Point": continue props = feat.get("properties", {}) if not props: continue # Look for name/station name fields (case-insensitive) prop_keys_lower = [k.lower() for k in props] has_name = any( "name" in k or "stn" in k or "station" in k for k in prop_keys_lower ) if has_name: return True return False # Validate CSV structure df = pd.read_csv(path, nrows=0) cols = {str(c).strip().lower().replace(" ", "_") for c in df.columns} return EXPECTED_MRT_COLUMNS.issubset(cols) except Exception: return False
[docs] def prompt_user_download(dataset_name: str) -> bool: """Prompt user for permission to download a dataset. Parameters ---------- dataset_name : str Name of the dataset to download ("HDB resale data" or "MRT stations data"). Returns ------- bool True if user approves download, False otherwise. """ # In non-interactive environments (pytest, CI), return False immediately import sys if not sys.stdin.isatty(): return False print(f"\n⚠️ {dataset_name} not found or has invalid schema.") print("Would you like to download it now? (y/n): ", end="", flush=True) try: response = input().strip().lower() return response in ("y", "yes") except (EOFError, KeyboardInterrupt, OSError): print() return False
[docs] def api_get_download_url(dataset_id: str, verbose: int = 0) -> str | None: """Hit the initiate-download endpoint to get the temporary S3 URL.""" log = logging.getLogger("fetch") endpoint = f"https://api-open.data.gov.sg/v1/public/api/datasets/{dataset_id}/initiate-download" try: if verbose: log.info(f"Requesting download URL for ID: {dataset_id}") resp = requests.get(endpoint, timeout=30) resp.raise_for_status() data = resp.json() # API returns { "code": 0, "data": { "url": "..." }, ... } return ( str(data.get("data", {}).get("url")) if data.get("data", {}).get("url") else None ) except Exception as e: log.warning(f"API request failed for {dataset_id}: {e}") return None
[docs] def generate_synthetic_hdb(path: str, limit: int) -> None: """Generate synthetic HDB resale dataset for testing.""" cols = [ "month", "town", "flat_type", "block", "street_name", "storey_range", "floor_area_sqm", "flat_model", "lease_commence_date", "remaining_lease", "resale_price", ] towns = [ "ANG MO KIO", "BEDOK", "BISHAN", "BUKIT BATOK", "BUKIT MERAH", "BUKIT PANJANG", "BUKIT TIMAH", "CENTRAL AREA", "CHOA CHU KANG", "CLEMENTI", "GEYLANG", "HOUGANG", "JURONG EAST", "JURONG WEST", "KALLANG/WHAMPOA", "MARINE PARADE", "PASIR RIS", "PUNGGOL", "QUEENSTOWN", "SEMBAWANG", "SENGKANG", "SERANGOON", "TAMPINES", "TOA PAYOH", "WOODLANDS", "YISHUN", ] types = ["2 ROOM", "3 ROOM", "4 ROOM", "5 ROOM", "EXECUTIVE"] models = [ "Improved", "New Generation", "Model A", "Standard", "Simplified", "Premium Apartment", "Apartment", "Maisonette", ] with open(path, "w", newline="", encoding="utf-8") as f: w = csv.DictWriter(f, fieldnames=cols) w.writeheader() for _i in range(min(limit, 10000)): year = 2017 + random.randint(0, 3) month = f"{year}-{random.randint(1,12):02d}" town = random.choice(towns) ft = random.choice(types) block = str(random.randint(1, 999)) street = f"STREET {chr(65 + random.randint(0,25))}" sm = 1 + 3 * random.randint(0, 13) sr = f"{sm:02d} TO {sm+2:02d}" area = random.choice( { "2 ROOM": [35, 45], "3 ROOM": [60, 65, 70], "4 ROOM": [80, 90, 95], "5 ROOM": [105, 110, 120], "EXECUTIVE": [120, 130, 140], }.get(ft, [60, 90, 110]) ) model = random.choice(models) lcy = random.randint(1970, 2015) years_elapsed = max(0, year - lcy) remy = max(1, 99 - years_elapsed) rem = f"{remy} years {random.randint(0,11):02d} months" base_psm = 4000 + random.randint(-300, 600) lease_factor = 0.6 + 0.4 * (remy / 99) town_factor = 0.9 + 0.2 * ( town in {"QUEENSTOWN", "BISHAN", "BUKIT TIMAH", "MARINE PARADE"} ) price = int( area * base_psm * lease_factor * town_factor * (0.9 + 0.2 * random.random()) ) w.writerow( { "month": month, "town": town, "flat_type": ft, "block": block, "street_name": street, "storey_range": sr, "floor_area_sqm": area, "flat_model": model, "lease_commence_date": lcy, "remaining_lease": rem, "resale_price": price, } ) logging.getLogger("fetch").info("Wrote synthetic dataset to %s", path)
[docs] def generate_synthetic_mrt(path: str) -> None: """Generate synthetic MRT GeoJSON for testing.""" sample = { "type": "FeatureCollection", "features": [ { "type": "Feature", "geometry": {"type": "Point", "coordinates": [103.851959, 1.290270]}, "properties": {"STN_NAME": "RAFFLES PLACE", "LINE": "NS"}, }, { "type": "Feature", "geometry": {"type": "Point", "coordinates": [103.845, 1.3008]}, "properties": {"STN_NAME": "BUGIS", "LINE": "DT"}, }, ], } with open(path, "w", encoding="utf-8") as f: json.dump(sample, f)
[docs] def download_file(url: str, dest_path: str) -> None: """Stream the file from the S3 URL to disk.""" log = logging.getLogger("fetch") log.info(f"Downloading to {dest_path}...") with requests.get(url, stream=True, timeout=30) as r: r.raise_for_status() with open(dest_path, "wb") as f: for chunk in r.iter_content(chunk_size=8192): f.write(chunk) log.info("Download complete.")
[docs] def fetch_hdb_data(limit: int, out_dir: str, filename: str, verbose: int = 0) -> int: """Fetch HDB resale dataset using the official Data.gov.sg 'initiate-download' API.""" log = logging.getLogger("fetch") # Create output directory os.makedirs(out_dir, exist_ok=True) hdb_out_path = os.path.join(out_dir, filename) # Try API first success_hdb = False url_hdb = api_get_download_url(DATASET_IDS["hdb_resale"], verbose=verbose) if url_hdb: try: download_file(url_hdb, hdb_out_path) success_hdb = True except Exception as e: log.error(f"Failed to download HDB data from URL: {e}") # Fallback to Synthetic if API failed if not success_hdb: log.warning( "Could not fetch official HDB data. Generating synthetic dataset..." ) generate_synthetic_hdb(hdb_out_path, limit) return 0
[docs] def fetch_mrt_data(mrt_out_path: str, verbose: int = 0) -> int: """Fetch MRT GeoJSON dataset.""" log = logging.getLogger("fetch") url = api_get_download_url(DATASET_IDS["mrt_exits"], verbose=verbose) # Ensure output directory exists os.makedirs(os.path.dirname(mrt_out_path) or ".", exist_ok=True) # Download if url: try: log.info(f"Downloading MRT Data to {mrt_out_path}...") with requests.get(url, stream=True, timeout=30) as r: r.raise_for_status() with open(mrt_out_path, "wb") as f: for chunk in r.iter_content(chunk_size=8192): f.write(chunk) return 0 except Exception as e: log.error(f"MRT download failed: {e}") log.warning( "Could not fetch official MRT data (url not available) - generating synthetic." ) # Synthetic Fallback log.warning("Generating synthetic MRT GeoJSON...") generate_synthetic_mrt(mrt_out_path) return 0
[docs] def ensure_hdb_dataset(path: str, verbose: int = 0) -> bool: """Ensure HDB dataset exists and has valid schema, downloading if necessary. Parameters ---------- path : str Path where the HDB dataset should exist. verbose : int Verbosity level for logging. Returns ------- bool True if dataset is available and valid, False if user declined download or download failed. """ log = logging.getLogger("DatasetManager") # Check if file exists and has valid schema if validate_hdb_schema(path): if verbose >= 1: log.info(f"HDB dataset found and validated at: {path}") return True # Prompt user for download if not prompt_user_download("HDB resale data"): return False # Download the dataset print(f"📥 Downloading HDB resale data to {path}...") out_dir = os.path.dirname(path) or "." filename = os.path.basename(path) try: result = fetch_hdb_data( limit=0, out_dir=out_dir, filename=filename, verbose=verbose ) if result == 0 and validate_hdb_schema(path): print("✅ HDB dataset downloaded successfully!") return True print("❌ Failed to download or validate HDB dataset.") return False except Exception as e: log.error(f"Error downloading HDB dataset: {e}") print(f"❌ Error downloading HDB dataset: {e}") return False
[docs] def ensure_mrt_dataset(path: str, verbose: int = 0) -> bool: """Ensure MRT dataset exists and has valid schema, downloading if necessary. Parameters ---------- path : str Path where the MRT dataset should exist. verbose : int Verbosity level for logging. Returns ------- bool True if dataset is available and valid, False if user declined download or download failed. """ log = logging.getLogger("DatasetManager") # Check if file exists and has valid schema if validate_mrt_schema(path): if verbose >= 1: log.info(f"MRT dataset found and validated at: {path}") return True # Prompt user for download if not prompt_user_download("MRT stations data"): return False # Download the dataset print(f"📥 Downloading MRT stations data to {path}...") try: result = fetch_mrt_data(path, verbose=verbose) if result == 0 and validate_mrt_schema(path): print("✅ MRT dataset downloaded successfully!") return True print("❌ Failed to download or validate MRT dataset.") return False except Exception as e: log.error(f"Error downloading MRT dataset: {e}") print(f"❌ Error downloading MRT dataset: {e}") return False