Source code for unsprawl.reporter

"""Reporting module for filtering and presenting valuation results.

This module handles filtering, ranking, and formatting of valuation results for display
and export.
"""

from __future__ import annotations

import logging

import numpy as np
import pandas as pd

from unsprawl.loader import Schema


[docs] class ReportGenerator: """Filter, rank, and render a clean buy list table to console. Filtering --------- - Optional exact/partial `town` filter. - Optional `budget` filter for maximum resale price. - Extended filters: flat_model, flat_type (exact or partial), storey_min/max, area_min/max, lease_min/max. Ranking ------- - Sort by `valuation_score` descending (highest implies most undervalued), with ties broken by lowest price_efficiency and then lowest resale_price. - Display the top N results (default: 10). Rendering --------- - Human-friendly table using pandas' built-in formatting. """ def __init__(self, schema: Schema | None = None) -> None: self.schema = schema or Schema() self.logger = logging.getLogger(self.__class__.__name__)
[docs] @staticmethod def _parse_storey_range(sr: str) -> tuple[int | None, int | None]: """Parse HDB storey range strings like "07 TO 09" into (min, max). Non-parsable inputs return (None, None). """ try: s = str(sr).strip() parts = s.split("TO") if "TO" in s else s.split("-") parts = [p.strip() for p in parts] if len(parts) == 2: return int(parts[0]), int(parts[1]) # Sometimes raw single like "10" or "01" v = int(s) return v, v except Exception: # noqa: BLE001 return None, None
[docs] def _apply_filters( self, df: pd.DataFrame, town: str | None = None, town_like: str | None = None, budget: float | None = None, flat_type: str | None = None, flat_type_like: str | None = None, flat_model: str | None = None, flat_model_like: str | None = None, storey_min: int | None = None, storey_max: int | None = None, area_min: float | None = None, area_max: float | None = None, lease_min: float | None = None, lease_max: float | None = None, ) -> pd.DataFrame: """Apply user-specified filters to DataFrame. Parameters ---------- df : pd.DataFrame The scored dataset. town : Optional[str] Town name for exact case-insensitive filtering. town_like : Optional[str] Substring case-insensitive match for town. budget : Optional[float] Maximum resale price. flat_type : Optional[str] Exact match filter for flat_type (case-insensitive). flat_type_like : Optional[str] Substring match for flat_type. flat_model : Optional[str] Exact match filter for flat_model. flat_model_like : Optional[str] Substring match for flat_model. storey_min, storey_max : Optional[int] Min/max storey number filter (overlap with storey_range). area_min, area_max : Optional[float] Floor area filters. lease_min, lease_max : Optional[float] Remaining lease (years) filters. """ s = self.schema out = df.copy() # Town filters if town: if s.town not in out.columns: self.logger.warning( "Town column '%s' not found; skipping town filter", s.town ) else: out = out[out[s.town].astype(str).str.lower() == town.strip().lower()] if town_like: if s.town not in out.columns: self.logger.warning( "Town column '%s' not found; skipping town_like filter", s.town ) else: out = out[ out[s.town] .astype(str) .str.contains(town_like, case=False, na=False) ] # Budget filter if budget is not None: if s.resale_price not in out.columns: self.logger.warning( "Resale price column '%s' not found; skipping budget filter", s.resale_price, ) else: out = out[ pd.to_numeric(out[s.resale_price], errors="coerce") <= float(budget) ] # Flat type filters if flat_type and s.flat_type in out.columns: out = out[ out[s.flat_type].astype(str).str.lower() == flat_type.strip().lower() ] if flat_type_like and s.flat_type in out.columns: out = out[ out[s.flat_type] .astype(str) .str.contains(flat_type_like, case=False, na=False) ] # Flat model filters if flat_model and "flat_model" in out.columns: out = out[ out["flat_model"].astype(str).str.lower() == flat_model.strip().lower() ] if flat_model_like and "flat_model" in out.columns: out = out[ out["flat_model"] .astype(str) .str.contains(flat_model_like, case=False, na=False) ] # Area filters if area_min is not None and s.floor_area in out.columns: out = out[ pd.to_numeric(out[s.floor_area], errors="coerce") >= float(area_min) ] if area_max is not None and s.floor_area in out.columns: out = out[ pd.to_numeric(out[s.floor_area], errors="coerce") <= float(area_max) ] # Lease filters if lease_min is not None and s.remaining_lease_years in out.columns: out = out[ pd.to_numeric(out[s.remaining_lease_years], errors="coerce") >= float(lease_min) ] if lease_max is not None and s.remaining_lease_years in out.columns: out = out[ pd.to_numeric(out[s.remaining_lease_years], errors="coerce") <= float(lease_max) ] # Storey overlap filters via storey_range string if ( storey_min is not None or storey_max is not None ) and "storey_range" in out.columns: mins, maxs = [], [] for v in out["storey_range"].tolist(): mn, mx = self._parse_storey_range(v) mins.append(mn) maxs.append(mx) out = out.assign( __sr_min=pd.Series(mins, dtype="Int64"), __sr_max=pd.Series(maxs, dtype="Int64"), ) if storey_min is not None: out = out[(out["__sr_max"].fillna(-np.inf) >= int(storey_min))] if storey_max is not None: out = out[(out["__sr_min"].fillna(np.inf) <= int(storey_max))] out = out.drop(columns=["__sr_min", "__sr_max"]) return out
[docs] def generate_dataframe( self, df: pd.DataFrame, town: str | None = None, town_like: str | None = None, budget: float | None = None, flat_type: str | None = None, flat_type_like: str | None = None, flat_model: str | None = None, flat_model_like: str | None = None, storey_min: int | None = None, storey_max: int | None = None, area_min: float | None = None, area_max: float | None = None, lease_min: float | None = None, lease_max: float | None = None, top_n: int = 10, full: bool = False, ) -> pd.DataFrame: """Produce the filtered, sorted DataFrame for display/export. If full is True, returns all rows after sorting; otherwise, returns the top_n rows. """ s = self.schema filtered = self._apply_filters( df, town=town, town_like=town_like, budget=budget, flat_type=flat_type, flat_type_like=flat_type_like, flat_model=flat_model, flat_model_like=flat_model_like, storey_min=storey_min, storey_max=storey_max, area_min=area_min, area_max=area_max, lease_min=lease_min, lease_max=lease_max, ) cols_pref = [ s.town, s.flat_type, "flat_model" if "flat_model" in filtered.columns else None, "storey_range" if "storey_range" in filtered.columns else None, s.resale_price, s.floor_area, s.remaining_lease_years, "price_per_sqm" if "price_per_sqm" in filtered.columns else None, s.price_efficiency, s.z_price_efficiency, s.valuation_score, "growth_potential" if "growth_potential" in filtered.columns else None, "psm_ratio" if "psm_ratio" in filtered.columns else None, "Nearest_MRT" if "Nearest_MRT" in filtered.columns else None, "Dist_m" if "Dist_m" in filtered.columns else None, ( "Accessibility_Score" if "Accessibility_Score" in filtered.columns else None ), ( "accessibility_score" if "accessibility_score" in filtered.columns else None ), ] cols_exist = [c for c in cols_pref if c is not None and c in filtered.columns] display = filtered[cols_exist].copy() if s.valuation_score in display.columns: display = display.dropna(subset=[s.valuation_score]) sort_cols: list[str] = [] ascending: list[bool] = [] if s.valuation_score in display.columns: sort_cols.append(s.valuation_score) ascending.append(False) if s.price_efficiency in display.columns: sort_cols.append(s.price_efficiency) ascending.append(True) if s.resale_price in display.columns: sort_cols.append(s.resale_price) ascending.append(True) if sort_cols: display = display.sort_values(by=sort_cols, ascending=ascending) # Round for presentation but keep raw dtype for export (we'll round a copy for printing) if full: return display return display.head(top_n)
[docs] def render( self, df: pd.DataFrame, town: str | None = None, town_like: str | None = None, budget: float | None = None, flat_type: str | None = None, flat_type_like: str | None = None, flat_model: str | None = None, flat_model_like: str | None = None, storey_min: int | None = None, storey_max: int | None = None, area_min: float | None = None, area_max: float | None = None, lease_min: float | None = None, lease_max: float | None = None, top_n: int = 10, ) -> str: """Generate the formatted table for the buy list. The table prioritizes the most undervalued units by sorting on `valuation_score` desc, breaking ties by `price_efficiency` asc and `resale_price` asc. """ s = self.schema self.logger.info( "Preparing report (filters: town=%s town_like=%s budget=%s)", town, town_like, budget, ) display = self.generate_dataframe( df, town=town, town_like=town_like, budget=budget, flat_type=flat_type, flat_type_like=flat_type_like, flat_model=flat_model, flat_model_like=flat_model_like, storey_min=storey_min, storey_max=storey_max, area_min=area_min, area_max=area_max, lease_min=lease_min, lease_max=lease_max, top_n=top_n, ).copy() # Round numeric columns for presentation for col, nd in [ (s.resale_price, 0), (s.floor_area, 2), (s.remaining_lease_years, 2), ("price_per_sqm", 2), (s.price_efficiency, 6), (s.z_price_efficiency, 3), (s.valuation_score, 3), ("psm_ratio", 3), ]: if col in display.columns: display[col] = pd.to_numeric(display[col], errors="coerce").round(nd) if display.empty: return "No results match the given filters or insufficient data to score." return str(display.to_string(index=False))