Source code for unsprawl.models

"""Financial modeling module for HDB valuation.

This module contains the core valuation logic including:
- Bala's Curve implementation for non-linear lease depreciation
- Feature engineering for price efficiency metrics
- Valuation scoring with growth potential analysis
"""

from __future__ import annotations

import logging
import re
from typing import Any

import numpy as np
import pandas as pd

from unsprawl.loader import Schema


[docs] class LeaseDepreciationModel: """Non-linear lease depreciation model (Bala's Curve Approximation). This model implements an economically rigorous depreciation curve for HDB leases, recognizing that a 99-year lease does not depreciate linearly. The value holds well for the first 30-40 years and then accelerates downward as lease expiry approaches. Mathematical Model ------------------ The depreciation factor is computed using a sigmoid-like curve: factor = exp(-k * ((99 - remaining) / 99)^n) Where: - remaining: years of lease remaining - k: decay rate parameter (default: 3.0) - n: curve steepness (default: 2.5) This produces: - Factor ≈ 1.0 for remaining > 80 years (minimal depreciation) - Factor ≈ 0.8-0.9 for remaining = 50-80 years (moderate depreciation) - Factor ≈ 0.3-0.7 for remaining = 20-50 years (accelerating depreciation) - Factor ≈ 0.0-0.2 for remaining < 20 years (severe depreciation) References ---------- This approximates the observed market behavior described in academic literature on HDB lease decay, including Bala's studies on Singapore public housing valuation. """ def __init__( self, max_lease: float = 99.0, decay_rate: float = 3.0, steepness: float = 2.5 ) -> None: """Initialize the lease depreciation model. Parameters ---------- max_lease : float Maximum lease period in years (default: 99.0 for HDB). decay_rate : float Controls overall depreciation intensity (higher = more aggressive decay). steepness : float Controls curve shape (higher = sharper decline near end of lease). """ self.max_lease = max_lease self.decay_rate = decay_rate self.steepness = steepness self.logger = logging.getLogger(self.__class__.__name__)
[docs] def compute_depreciation_factor( self, remaining_years: pd.Series[float] | float ) -> pd.Series[float] | float: """Compute non-linear depreciation factor for given remaining lease years. Parameters ---------- remaining_years : pd.Series | float Remaining lease in years (can be Series or scalar). Returns ------- pd.Series | float Depreciation factor between 0 and 1, where 1 = no depreciation. """ # Handle both Series and scalar inputs is_scalar = not isinstance(remaining_years, pd.Series) if is_scalar: remaining: Any = np.array([remaining_years]) else: remaining = remaining_years.values # type: ignore[union-attr] # Compute normalized age (0 = new, 1 = expired) normalized_age = ( self.max_lease - np.clip(remaining, 0, self.max_lease) ) / self.max_lease # Apply non-linear decay curve: exp(-k * age^n) with np.errstate(over="ignore", invalid="ignore"): factor = np.exp(-self.decay_rate * np.power(normalized_age, self.steepness)) # Ensure factor is in valid range [0, 1] factor = np.clip(factor, 0.0, 1.0) # Handle NaN inputs factor = np.where(np.isnan(remaining), np.nan, factor) if is_scalar: return float(factor[0]) return pd.Series(factor, index=remaining_years.index) # type: ignore[union-attr]
[docs] def adjust_price_efficiency( self, base_efficiency: pd.Series[float], remaining_years: pd.Series[float] ) -> pd.Series[float]: """Adjust price efficiency using non-linear lease depreciation. The adjusted efficiency accounts for the non-linear loss of value over time. Lower depreciation factors increase the effective price per area-year, making properties with shorter leases appear more expensive on a value-adjusted basis. Parameters ---------- base_efficiency : pd.Series Base price efficiency (price / (area * remaining_years)). remaining_years : pd.Series Remaining lease years for each property. Returns ------- pd.Series Lease-adjusted price efficiency. """ factor = self.compute_depreciation_factor(remaining_years) # Adjust efficiency: divide by depreciation factor # Lower factor (more depreciation) increases the effective price with np.errstate(divide="ignore", invalid="ignore"): adjusted = base_efficiency / factor # Handle edge cases return adjusted.mask(~np.isfinite(adjusted), np.nan)
[docs] class FeatureEngineer: """Engineer features required for valuation. Responsibilities ---------------- - Parse remaining lease strings of the form "85 years 3 months" into a float in units of years (e.g., 85.25) with robust handling of edge cases. - Compute price efficiency as: resale_price / (floor_area_sqm * remaining_lease_years) - Apply non-linear lease depreciation adjustment via LeaseDepreciationModel Mathematical Notes ------------------ Price efficiency penalizes larger prices per effective area-year. By dividing price by both floor area (sqm) and remaining lease (years), the metric naturally adjusts for lease decay. The non-linear depreciation model further refines this by accounting for the accelerating loss of value as lease expiry approaches. """ _LEASE_YEARS_RE = re.compile(r"(?P<years>\d+)\s*year") _LEASE_MONTHS_RE = re.compile(r"(?P<months>\d+)\s*month") def __init__( self, schema: Schema | None = None, use_lease_depreciation: bool = True, depreciation_model: LeaseDepreciationModel | None = None, ) -> None: """Initialize FeatureEngineer with optional lease depreciation model. Parameters ---------- schema : Schema | None Schema definition for column names. use_lease_depreciation : bool Whether to apply non-linear lease depreciation adjustment (default: True). depreciation_model : LeaseDepreciationModel | None Custom depreciation model. If None and use_lease_depreciation=True, creates default LeaseDepreciationModel. """ self.schema = schema or Schema() self.logger = logging.getLogger(self.__class__.__name__) self.use_lease_depreciation = use_lease_depreciation self.depreciation_model: LeaseDepreciationModel | None if use_lease_depreciation: self.depreciation_model = depreciation_model or LeaseDepreciationModel() else: self.depreciation_model = None
[docs] def _parse_lease_text(self, text: str | float | None) -> float | None: """Parse a remaining lease string into float years. Examples -------- - "85 years 3 months" -> 85.25 - "99 years" -> 99.0 - "8 months" -> 0.666... - "less than 1 year" -> 0.5 (conservative placeholder) Parameters ---------- text : str | float | int | None Raw value from the dataset. Returns ------- Optional[float] Parsed years as float, or None if parsing fails. """ if text is None or (isinstance(text, float) and np.isnan(text)): return None # If already numeric (rare), accept positive values if isinstance(text, float): try: val = float(text) return val if val >= 0 else None except Exception: # noqa: BLE001 return None s = str(text).strip().lower() if s in {"na", "n/a", "nan", "", "none"}: return None if "less than 1 year" in s: # Conservative assumption when qualitative return 0.5 years = 0.0 months = 0.0 try: y_match = self._LEASE_YEARS_RE.search(s) m_match = self._LEASE_MONTHS_RE.search(s) if y_match: years = float(y_match.group("years")) if m_match: months = float(m_match.group("months")) if years == 0.0 and months == 0.0: # Try a pure-number fallback (e.g., "85") try: return float(s) except Exception: # noqa: BLE001 return None return years + months / 12.0 except Exception: # noqa: BLE001 return None
[docs] def _infer_remaining_lease_from_commence( self, df: pd.DataFrame, assumed_lease_years: float = 99.0 ) -> pd.Series[float]: """Infer remaining lease (years) from `lease_commence_date` and `month` columns. Mathematics ----------- remaining_years = assumed_lease_years - ((year + month/12) - lease_commence_year) where (year, month) come from the transaction month string "YYYY-MM". Values are clipped to [0, assumed_lease_years]. Non-parsable rows yield NaN. """ year_month = df.get("month") commence = df.get("lease_commence_date") out = pd.Series(np.nan, index=df.index, dtype="float64") if year_month is None or commence is None: return out # Coerce commence to numeric year commence_year = pd.to_numeric(commence, errors="coerce") # Parse year and month from YYYY-MM ym = year_month.astype(str).str.strip() year = pd.to_numeric(ym.str.slice(0, 4), errors="coerce") mon = pd.to_numeric(ym.str.slice(5, 7), errors="coerce") frac_year = year + (mon.fillna(1) - 1) / 12.0 rem = assumed_lease_years - (frac_year - commence_year) rem = rem.where(np.isfinite(rem)) return rem.clip(lower=0.0, upper=assumed_lease_years)
[docs] def parse_remaining_lease(self, df: pd.DataFrame) -> pd.DataFrame: """Add a `remaining_lease_years` float column to the DataFrame. The method attempts to parse the canonical `remaining_lease` column if present. If a numeric-looking `remaining_lease_years` already exists, it is respected. If missing, it falls back to inferring from (`lease_commence_date`, `month`) assuming a 99-year lease. All parsing errors coerce to NaN. Parameters ---------- df : pd.DataFrame Input dataframe. Returns ------- pd.DataFrame DataFrame with an added/updated `remaining_lease_years` column. """ col_raw = self.schema.remaining_lease_raw col_years = self.schema.remaining_lease_years self.logger.info("Parsing remaining lease into years") if col_years in df.columns: df[col_years] = pd.to_numeric(df[col_years], errors="coerce") return df parsed: pd.Series[float] | None = None if col_raw in df.columns: parsed_list: list[float | None] = [] for val in df[col_raw].tolist(): try: parsed_list.append(self._parse_lease_text(val)) except Exception: # noqa: BLE001 parsed_list.append(None) parsed = pd.Series(parsed_list, index=df.index, dtype="float64") # Fallback inference when parsed is missing or largely NaN if parsed is None or parsed.isna().mean() > 0.5: self.logger.info( "Inferring remaining lease from lease_commence_date and month (99-year assumption)" ) inferred = self._infer_remaining_lease_from_commence( df, assumed_lease_years=99.0 ) parsed = inferred if parsed is None else parsed.fillna(inferred) if parsed is None: self.logger.warning( "No remaining lease information available; creating NaN column" ) df[col_years] = np.nan else: df[col_years] = pd.to_numeric(parsed, errors="coerce") return df
[docs] def compute_price_efficiency(self, df: pd.DataFrame) -> pd.DataFrame: """Compute price efficiency metric with optional non-linear lease depreciation. Formula (Base) -------------- price_efficiency = resale_price / (floor_area_sqm * remaining_lease_years) Formula (With Depreciation Adjustment) --------------------------------------- price_efficiency_adjusted = base_efficiency / depreciation_factor(remaining_lease) where depreciation_factor ∈ [0, 1] computed via Bala's Curve. Interpretation -------------- Lower values indicate better cost per area-year. The non-linear depreciation adjustment increases the effective price for properties with shorter leases, reflecting the accelerating loss of market value as lease expiry approaches. This makes the valuation economically rigorous and market-realistic. """ s = self.schema missing = [ c for c in [s.resale_price, s.floor_area, s.remaining_lease_years] if c not in df.columns ] if missing: self.logger.error( "Missing required columns for price efficiency: %s", missing ) df[s.price_efficiency] = np.nan return df self.logger.info( "Computing price efficiency (lease depreciation: %s)", self.use_lease_depreciation, ) # Compute base efficiency denom = df[s.floor_area] * df[s.remaining_lease_years] with np.errstate(divide="ignore", invalid="ignore"): base_efficiency = df[s.resale_price] / denom base_efficiency = base_efficiency.mask(~np.isfinite(base_efficiency), np.nan) # Apply non-linear lease depreciation adjustment if enabled if self.use_lease_depreciation and self.depreciation_model is not None: self.logger.debug( "Applying non-linear lease depreciation adjustment (Bala's Curve)" ) df[s.price_efficiency] = self.depreciation_model.adjust_price_efficiency( base_efficiency, df[s.remaining_lease_years] ) else: df[s.price_efficiency] = base_efficiency return df
[docs] class ValuationEngine: """Compute group-wise Z-Scores, growth potential, and a final valuation score. Methodology ----------- 1) Compute Z-Score of `price_efficiency` within groups defined by configurable grouping keys (default: (town, flat_type)). The Z-Score is defined as: z = (x - mu) / sigma where x is the observation's price_efficiency, mu is the group mean, and sigma is the group standard deviation. If sigma == 0 or NaN, z is set to 0. 2) Define Valuation_Score = -Z_Price_Efficiency so that higher scores indicate better (cheaper-than-peers) properties. 3) Compute Growth_Potential metric based on Price-per-Sqm vs Town Average: - Deep Value (High Growth): Unit PSM < 0.85 × Town Avg PSM - Fair Value (Moderate Growth): 0.85 ≤ Unit PSM < 1.0 × Town Avg PSM - Premium (Low Growth): Unit PSM ≥ 1.0 × Town Avg PSM This civic value metric identifies properties trading significantly below their peer average, suggesting potential for price appreciation or representing exceptional value for money. """ def __init__(self, schema: Schema | None = None) -> None: self.schema = schema or Schema() self.logger = logging.getLogger(self.__class__.__name__)
[docs] def _groupwise_zscore( self, series: pd.Series[float], groups: pd.Series[str] ) -> pd.Series[float]: """Compute group-wise Z-Score with robust handling of zero std. Parameters ---------- series : pd.Series Numeric series to standardize. groups : pd.Series Group labels of same length as series. Returns ------- pd.Series Group-wise z-scores with NaN-safe handling; zeros where std is 0 or NaN. """ df = pd.DataFrame({"x": series, "g": groups}) # Compute mean and std per group using transform for alignment means = df.groupby("g")["x"].transform("mean") stds = df.groupby("g")["x"].transform("std") z = (series - means) / stds return z.mask(~np.isfinite(z), 0.0)
[docs] def _compute_growth_potential(self, df: pd.DataFrame) -> pd.DataFrame: """Compute future appreciation potential based on price-per-sqm vs town average. This civic finance heuristic identifies "deep value" properties trading significantly below their peer group average, which may indicate: 1. Undervaluation relative to neighborhood 2. Higher potential for price appreciation 3. Exceptional value-for-money opportunities The metric uses vectorized pandas operations for performance. Parameters ---------- df : pd.DataFrame Input DataFrame with resale_price, floor_area_sqm, town, and flat_type. Returns ------- pd.DataFrame DataFrame with added columns: - price_per_sqm: Unit price per square meter - town_avg_psm: Average PSM for (town, flat_type) peer group - psm_ratio: Unit PSM / Town Avg PSM - growth_potential: Categorical score (High/Moderate/Low) """ s = self.schema # Compute price per sqm with np.errstate(divide="ignore", invalid="ignore"): df["price_per_sqm"] = df[s.resale_price] / df[s.floor_area] df["price_per_sqm"] = df["price_per_sqm"].mask( ~np.isfinite(df["price_per_sqm"]), np.nan ) # Compute town + flat_type average PSM using vectorized groupby transform group_cols = [s.town, s.flat_type] if all(c in df.columns for c in group_cols): df["town_avg_psm"] = df.groupby(group_cols)["price_per_sqm"].transform( "mean" ) # Compute ratio: unit PSM / average PSM with np.errstate(divide="ignore", invalid="ignore"): df["psm_ratio"] = df["price_per_sqm"] / df["town_avg_psm"] df["psm_ratio"] = df["psm_ratio"].mask( ~np.isfinite(df["psm_ratio"]), np.nan ) # Categorize growth potential using vectorized operations # High Growth: PSM < 0.85 × Town Avg (Deep Value) # Moderate Growth: 0.85 ≤ PSM < 1.0 × Town Avg (Fair Value) # Low Growth: PSM ≥ 1.0 × Town Avg (Premium) conditions = [ df["psm_ratio"] < 0.85, (df["psm_ratio"] >= 0.85) & (df["psm_ratio"] < 1.0), df["psm_ratio"] >= 1.0, ] choices = ["High", "Moderate", "Low"] df["growth_potential"] = np.select(conditions, choices, default="Unknown") self.logger.info( "Growth potential computed: %s", df["growth_potential"].value_counts().to_dict(), ) else: self.logger.warning( "Missing columns for growth potential; setting to Unknown" ) df["town_avg_psm"] = np.nan df["psm_ratio"] = np.nan df["growth_potential"] = "Unknown" return df
[docs] def score( self, df: pd.DataFrame, group_by: list[str] | None = None ) -> pd.DataFrame: """Add Z-Score, Valuation Score, and Growth Potential columns to the DataFrame. Adds the following columns: - z_price_efficiency: group-wise Z-Score of price_efficiency within selected groups - valuation_score: -z_price_efficiency, so higher is more undervalued - price_per_sqm: Price per square meter - town_avg_psm: Average PSM for peer group (town, flat_type) - psm_ratio: Unit PSM / Town Average PSM - growth_potential: Categorical (High/Moderate/Low) appreciation potential Parameters ---------- df : pd.DataFrame Input DataFrame containing required columns. group_by : Optional[List[str]] Column names to define peer groups. Defaults to [town, flat_type]. Returns ------- pd.DataFrame DataFrame with added score columns. """ s = self.schema default_groups = [s.town, s.flat_type] group_cols = group_by or default_groups required = group_cols + [s.price_efficiency] missing = [c for c in required if c not in df.columns] if missing: self.logger.error("Missing columns for valuation: %s", missing) df[s.z_price_efficiency] = np.nan df[s.valuation_score] = np.nan return df self.logger.info("Computing group-wise z-scores over groups: %s", group_cols) # Create tuple keys for groups to allow multiple columns grouping groups = ( list(zip(*[df[c] for c in group_cols], strict=False)) if group_cols else [()] * len(df) ) group_keys = pd.Series(groups, index=df.index, dtype="object") z = self._groupwise_zscore(df[s.price_efficiency], group_keys) df[s.z_price_efficiency] = z df[s.valuation_score] = -z # Compute growth potential (civic value metric) return self._compute_growth_potential(df)