"""Command-line interface and application orchestrator for Unsprawl.
This module contains:
- UnsprawlApp: Main orchestrator class that wires the entire pipeline
- CLI argument parser and main entry point
- Integration logic for all components
"""
# mypy: ignore-errors
from __future__ import annotations
import contextlib
import importlib.util
import logging
import os
import subprocess
import sys
import webbrowser
from pathlib import Path
import numpy as np
import pandas as pd
import typer
from rich.console import Console
from rich.panel import Panel
from rich.progress import Progress, SpinnerColumn, TextColumn
from rich.table import Table
from unsprawl.fetch import DEFAULT_HDB_PATH, DEFAULT_MRT_PATH, validate_mrt_schema
from unsprawl.geocoding import ensure_lat_lon_from_town_centroids
from unsprawl.loader import HDBLoader, Schema
from unsprawl.models import FeatureEngineer, ValuationEngine
from unsprawl.providers.data.sg.govsg import GovSGProvider
from unsprawl.reporter import ReportGenerator
from unsprawl.spatial import TransportScorer
from unsprawl.utils import (
__version__,
configure_logging,
find_open_port,
get_project_description,
)
# Initialize Rich console for pretty output
console = Console()
[docs]
class UnsprawlApp:
"""Application orchestrator wiring the pipeline and providing both programmatic and
CLI access.
This class can be used directly as a Python module or via the CLI. For programmatic usage,
use the `process()` method with explicit parameters. For CLI usage, use the `run()` method
with parsed arguments.
Example (Module Usage)
----------------------
>>> app = UnsprawlApp()
>>> results = app.process(
... input_path="resale.csv",
... town="PUNGGOL",
... budget=600000,
... top_n=10
... )
>>> print(results.head())
Example (With MRT Accessibility - Default)
------------------------------------------
>>> app = UnsprawlApp()
>>> results = app.process(
... input_path="resale.csv",
... town="BISHAN"
... )
Example (Custom MRT Catalog)
----------------------------
>>> results = app.process(
... input_path="resale.csv",
... mrt_catalog="stations.geojson",
... town="BISHAN"
... )
"""
def __init__(
self, schema: Schema | None = None, transport_cache_dir: str | None = None
) -> None:
"""Initialize the valuation engine with optional custom schema and cache
directory.
Parameters
----------
schema : Schema | None
Custom schema definition. If None, uses default Schema().
transport_cache_dir : Optional[str]
Directory for caching transport KDTree data. If None, uses default .cache_transport.
"""
self.schema = schema or Schema()
self.loader = HDBLoader(self.schema)
self.fe = FeatureEngineer(self.schema)
self.engine = ValuationEngine(self.schema)
self.transport = TransportScorer(cache_dir=transport_cache_dir)
self.reporter = ReportGenerator(self.schema)
self.logger = logging.getLogger(self.__class__.__name__)
self._data: pd.DataFrame | None = None
[docs]
def load_data(self, input_path: str) -> pd.DataFrame:
"""Load HDB resale data from CSV file.
Parameters
----------
input_path : str
Path to the HDB resale CSV file.
Returns
-------
pd.DataFrame
Loaded and normalized DataFrame.
Raises
------
FileNotFoundError
If the file does not exist.
ValueError
If the CSV cannot be parsed.
"""
self._data = self.loader.load(input_path)
return self._data
[docs]
def process(
self,
input_path: str | None = None,
data: pd.DataFrame | None = None,
mrt_catalog: str | None = None,
clear_transport_cache: bool = False,
group_by: list[str] | None = None,
enable_accessibility_adjust: bool = True,
# Filters
town: str | None = None,
town_like: str | None = None,
budget: float | None = None,
flat_type: str | None = None,
flat_type_like: str | None = None,
flat_model: str | None = None,
flat_model_like: str | None = None,
storey_min: int | None = None,
storey_max: int | None = None,
area_min: float | None = None,
area_max: float | None = None,
lease_min: float | None = None,
lease_max: float | None = None,
top_n: int = 10,
return_full: bool = False,
) -> pd.DataFrame:
"""Process HDB resale data and return filtered, scored results.
This is the main programmatic entry point for using the valuation engine as a module.
Parameters
----------
input_path : Optional[str]
Path to HDB resale CSV. Required if `data` is not provided.
data : Optional[pd.DataFrame]
Pre-loaded DataFrame. If provided, `input_path` is ignored.
mrt_catalog : Optional[str]
Path to MRT stations GeoJSON or CSV for transport scoring.
clear_transport_cache : bool
Whether to clear transport cache before processing.
group_by : Optional[List[str]]
Columns to group by for peer comparison z-scores. Defaults to [town, flat_type].
enable_accessibility_adjust : bool
Whether to adjust price efficiency based on MRT accessibility. Default True.
town : Optional[str]
Exact town filter (case-insensitive).
town_like : Optional[str]
Partial town match (substring).
budget : Optional[float]
Maximum resale price.
flat_type : Optional[str]
Exact flat type filter.
flat_type_like : Optional[str]
Partial flat type match.
flat_model : Optional[str]
Exact flat model filter.
flat_model_like : Optional[str]
Partial flat model match.
storey_min : Optional[int]
Minimum storey number.
storey_max : Optional[int]
Maximum storey number.
area_min : Optional[float]
Minimum floor area (sqm).
area_max : Optional[float]
Maximum floor area (sqm).
lease_min : Optional[float]
Minimum remaining lease (years).
lease_max : Optional[float]
Maximum remaining lease (years).
top_n : int
Number of top results to return. Default 10.
return_full : bool
If True, return all filtered results instead of just top_n.
Returns
-------
pd.DataFrame
Filtered and scored results, sorted by valuation_score descending.
Raises
------
ValueError
If neither input_path nor data is provided and the default dataset path is not available.
FileNotFoundError
If input_path does not exist.
Examples
--------
>>> app = UnsprawlApp()
>>> results = app.process(
... input_path="resale.csv",
... town="PUNGGOL",
... budget=600000,
... top_n=5
... )
>>> print(f"Found {len(results)} undervalued properties")
"""
# Load data
if data is not None:
df = data.copy()
self._data = df
elif input_path is not None:
df = self.load_data(input_path)
elif self._data is not None:
df = self._data.copy()
else:
# Module-friendly default: fall back to the packaged default dataset path
# (if present) so that `app.process(town=..., budget=...)` works out-of-the-box.
if os.path.exists(DEFAULT_HDB_PATH):
df = self.load_data(DEFAULT_HDB_PATH)
else:
raise ValueError(
"Either input_path or data must be provided, or data must be pre-loaded via load_data()"
)
# Feature engineering
df = self.fe.parse_remaining_lease(df)
df = self.fe.compute_price_efficiency(df)
# Transport scoring
# By default (mrt_catalog is None), use the project's default MRT catalog if it exists.
# Passing mrt_catalog="" explicitly disables transport scoring.
resolved_mrt_catalog: str | None
if mrt_catalog is None:
resolved_mrt_catalog = (
DEFAULT_MRT_PATH if validate_mrt_schema(DEFAULT_MRT_PATH) else None
)
elif str(mrt_catalog).strip() == "":
resolved_mrt_catalog = None
else:
resolved_mrt_catalog = mrt_catalog
if resolved_mrt_catalog:
try:
if clear_transport_cache:
self.transport.clear_cache()
# Ensure we have coordinates for scoring.
df = ensure_lat_lon_from_town_centroids(df)
if str(resolved_mrt_catalog).lower().endswith(".geojson"):
self.transport.load_stations_geojson(resolved_mrt_catalog)
else:
stations_df = pd.read_csv(resolved_mrt_catalog)
self.transport.load_stations(stations_df)
df = self.transport.calculate_accessibility_score(df)
if (
"Accessibility_Score" in df.columns
and "accessibility_score" not in df.columns
):
df["accessibility_score"] = df["Accessibility_Score"]
except Exception as exc:
self.logger.warning("Transport scoring skipped due to error: %s", exc)
# Normalize group_by columns
if group_by:
group_by = [g.strip().lower().replace(" ", "_") for g in group_by]
# Integrate accessibility into valuation
if enable_accessibility_adjust and "Accessibility_Score" in df.columns:
adj = 1.0 + (
pd.to_numeric(df["Accessibility_Score"], errors="coerce") / 10.0
)
with np.errstate(divide="ignore", invalid="ignore"):
df[self.schema.price_efficiency] = (
df[self.schema.price_efficiency] / adj
)
# Compute valuation scores
df = self.engine.score(df, group_by=group_by)
# Filter and return results
return self.reporter.generate_dataframe(
df,
town=town,
town_like=town_like,
budget=budget,
flat_type=flat_type,
flat_type_like=flat_type_like,
flat_model=flat_model,
flat_model_like=flat_model_like,
storey_min=storey_min,
storey_max=storey_max,
area_min=area_min,
area_max=area_max,
lease_min=lease_min,
lease_max=lease_max,
top_n=top_n,
full=return_full,
)
[docs]
def render_report(
self,
data: pd.DataFrame | None = None,
town: str | None = None,
town_like: str | None = None,
budget: float | None = None,
flat_type: str | None = None,
flat_type_like: str | None = None,
flat_model: str | None = None,
flat_model_like: str | None = None,
storey_min: int | None = None,
storey_max: int | None = None,
area_min: float | None = None,
area_max: float | None = None,
lease_min: float | None = None,
lease_max: float | None = None,
top_n: int = 10,
) -> str:
"""Render a formatted string report from processed data.
Parameters
----------
data : Optional[pd.DataFrame]
Pre-processed DataFrame with scores. If None, uses internally stored data.
top_n : int
Number of results to include in report.
Notes
-----
This method accepts the same filter arguments as :meth:`process`.
Returns
-------
str
Formatted table string ready for console output.
"""
if data is None:
if self._data is None:
raise ValueError(
"No data available. Call process() or load_data() first."
)
data = self._data
return self.reporter.render(
data,
town=town,
town_like=town_like,
budget=budget,
flat_type=flat_type,
flat_type_like=flat_type_like,
flat_model=flat_model,
flat_model_like=flat_model_like,
storey_min=storey_min,
storey_max=storey_max,
area_min=area_min,
area_max=area_max,
lease_min=lease_min,
lease_max=lease_max,
top_n=top_n,
)
[docs]
def render_rich_table(
self,
df: pd.DataFrame,
title: str = "🏠 Top Undervalued Residential Properties",
) -> Table:
"""Render a Rich table from results DataFrame.
Parameters
----------
df : pd.DataFrame
Results DataFrame with valuation scores.
title : str
Table title.
Returns
-------
rich.table.Table
Formatted Rich table ready for console output.
"""
table = Table(
title=title,
show_header=True,
header_style="bold magenta",
border_style="bright_blue",
)
# Add columns with appropriate styling
table.add_column("Rank", style="cyan", justify="right", width=6)
table.add_column("Town", style="green")
table.add_column("Flat Type", style="yellow")
table.add_column("Address", style="white", max_width=30)
table.add_column("Price", style="bright_green", justify="right")
table.add_column("Area (m²)", style="cyan", justify="right")
table.add_column("Lease (yrs)", style="yellow", justify="right")
table.add_column("Score", style="bold bright_cyan", justify="right")
# Add optional accessibility column if present
has_accessibility = "Accessibility_Score" in df.columns
if has_accessibility:
table.add_column("MRT Access", style="magenta", justify="right")
# Populate table rows
# TODO(optimization): Refactor to avoid Python-level row iteration in the hot path.
# Current row-wise rendering is acceptable for Top-N tables, but will bottleneck at scale when exporting or enriching
# many rows. Prefer vectorized Pandas ops and batched spatial queries (KDTree) where applicable.
for idx, row in df.iterrows():
rank = str(idx + 1) if isinstance(idx, int) else str(idx)
town = str(row.get(self.schema.town, "N/A"))
flat_type = str(row.get(self.schema.flat_type, "N/A"))
# Try to build address from block and street_name columns if they exist
block = str(row.get("block", ""))
street = str(row.get("street_name", ""))
address = (block + " " + street).strip()[:30] if block or street else "N/A"
price = row.get(self.schema.resale_price, 0)
price_str = f"${price:,.0f}" if pd.notna(price) else "N/A"
area = row.get(self.schema.floor_area, 0)
area_str = f"{area:.1f}" if pd.notna(area) else "N/A"
lease = row.get(self.schema.remaining_lease_years, 0)
lease_str = f"{lease:.1f}" if pd.notna(lease) else "N/A"
score = row.get(self.schema.valuation_score, 0)
score_str = f"{score:.2f}" if pd.notna(score) else "N/A"
row_data = [
rank,
town,
flat_type,
address,
price_str,
area_str,
lease_str,
score_str,
]
if has_accessibility:
acc_score = row.get("Accessibility_Score", 0)
acc_str = f"{acc_score:.1f}" if pd.notna(acc_score) else "N/A"
row_data.append(acc_str)
# Color code based on score
if pd.notna(score):
if score >= 2.0:
style = "bold green"
elif score >= 1.0:
style = "green"
elif score >= 0:
style = "yellow"
else:
style = "dim"
else:
style = "dim"
table.add_row(*row_data, style=style)
return table
app = typer.Typer(add_completion=False, help=get_project_description())
# Alias to avoid any potential name shadowing inside functions
_TY_APP = app
# ------------------------------- Typer Commands -------------------------------
[docs]
@app.command("docs")
def cmd_docs(
port: int = typer.Option(8000, help="Docs server port"),
open_browser: bool = typer.Option(True, help="Open browser after launch"),
) -> int:
"""Launch Sphinx documentation server with live reload."""
port = find_open_port(port)
try:
src = str(Path("docs") / "source")
out = str(Path("docs") / "build" / "html")
if importlib.util.find_spec("sphinx_autobuild") is not None:
cmd = [
sys.executable,
"-m",
"sphinx_autobuild",
src,
out,
"--port",
str(port),
]
console.print(
Panel.fit(
"[bold cyan]sphinx-autobuild (live reload)[/bold cyan]",
border_style="bright_blue",
)
)
if open_browser:
with contextlib.suppress(Exception):
webbrowser.open(f"http://localhost:{port}")
return subprocess.call(cmd)
build_cmd = [sys.executable, "-m", "sphinx", "-b", "html", src, out]
console.print(
Panel.fit(
"[bold cyan]Building docs (sphinx)[/bold cyan]",
border_style="bright_blue",
)
)
rc = subprocess.call(build_cmd)
if rc != 0:
return rc
console.print(
Panel.fit(
"[bold cyan]Serving docs (http.server)[/bold cyan]",
border_style="bright_blue",
)
)
if open_browser:
with contextlib.suppress(Exception):
webbrowser.open(f"http://localhost:{port}")
return subprocess.call(
[sys.executable, "-m", "http.server", str(port), "-d", out]
)
except Exception as exc: # noqa: BLE001
console.print(f"[red]✗ Docs launch failed: {exc}[/red]")
logging.getLogger("docs").exception("Docs launch failed: %s", exc)
return 4
[docs]
@app.command("cache")
def cmd_cache(
clear: bool = typer.Option(False, help="Clear transport cache and exit"),
transport_cache_dir: str | None = typer.Option(
None, help="Transport cache directory"
),
) -> int:
"""Manage transport scoring cache."""
scorer = TransportScorer(cache_dir=transport_cache_dir)
if clear:
scorer.clear_cache()
else:
logging.getLogger("Cache").info("Transport cache dir: %s", scorer._cache_dir)
return 0
[docs]
@app.command("fetch")
def cmd_fetch(
region: str = typer.Option("SG", help="Region (currently SG)"),
limit: int = typer.Option(5000, help="Synthetic fallback row cap"),
datasets: str = typer.Option("all", help="Datasets: all|resale|mrt"),
force: bool = typer.Option(False, help="Force re-download/regenerate"),
) -> int:
"""Fetch and cache regional datasets."""
if region != "SG":
raise ValueError("Only region=SG is supported for fetch in this version")
provider = GovSGProvider()
if datasets in ("all", "resale"):
provider.fetch_resale_prices(limit=limit, force=force)
console.print(
f"[green]✓[/green] Resale prices cached at [cyan]{provider.resale_prices_path}[/cyan]"
)
if datasets in ("all", "mrt"):
provider.fetch_mrt_exits(force=force)
console.print(
f"[green]✓[/green] MRT exits cached at [cyan]{provider.mrt_exits_path}[/cyan]"
)
return 0
[docs]
@app.command("advect")
def cmd_advect(
agents: int = 100000,
steps: int = 600,
grid_res: int = 256,
device: str | None = None,
amplitude: float = 1.0,
frequency: float = 3.0,
phase: float = 0.0,
dt: float = 1.0 / 60.0,
output: str | None = None,
output_format: str = "npy",
) -> int:
"""Run GPU-accelerated agent advection simulation."""
try:
import time
import numpy as _np
from simulation import AdvectEngine, AdvectParams
console.print(
Panel.fit(
"[bold cyan]Project Advect[/bold cyan]\n[dim]GPU-accelerated agent simulation (Warp)[/dim]",
border_style="bright_blue",
)
)
params = AdvectParams(dt=float(dt))
engine = AdvectEngine(
num_agents=int(agents),
grid_res=int(grid_res),
domain_min=(0.0, 0.0),
domain_max=(1.0, 1.0),
device=device,
params=params,
seed=0,
)
engine.generate_flow_field_curl(
amplitude=float(amplitude), frequency=float(frequency), phase=float(phase)
)
warmup = 10 if steps >= 10 else 0
if warmup:
engine.step(warmup)
t0 = time.perf_counter()
engine.step(int(steps))
t1 = time.perf_counter()
elapsed = max(t1 - t0, 1e-12)
steps_per_sec = steps / elapsed
agent_updates_per_sec = (steps * engine.num_agents) / elapsed
perf = Table(title="Advect Performance")
perf.add_column("Metric")
perf.add_column("Value", justify="right")
perf.add_row("Device", str(engine.device))
perf.add_row("Agents", f"{engine.num_agents:,}")
perf.add_row("Steps", str(steps))
perf.add_row("Elapsed (s)", f"{elapsed:,.3f}")
perf.add_row("Steps/s", f"{steps_per_sec:,.2f}")
perf.add_row("Agent-updates/s", f"{agent_updates_per_sec:,.0f}")
console.print(perf)
if output:
pos = engine.positions_numpy().astype(_np.float32, copy=False)
if output_format == "npy":
_np.save(output, pos)
elif output_format == "csv":
import pandas as _pd
_pd.DataFrame(pos, columns=["x", "y"]).to_csv(output, index=False)
else:
import pandas as _pd
df_out = _pd.DataFrame(pos, columns=["x", "y"])
try:
df_out.to_parquet(output, index=False)
except ImportError as parq_exc:
raise ImportError(
"Parquet export requires `pyarrow` or `fastparquet`. Install with: pip install pyarrow"
) from parq_exc
console.print(f"[green]✓[/green] Wrote positions to [cyan]{output}[/cyan]")
return 0
except ImportError as exc:
console.print(f"[red]{exc}[/red]")
return 2
except Exception as exc: # noqa: BLE001
console.print(f"[red]✗ Advect failed: {exc}[/red]")
logging.getLogger("advect").exception("Advect failed: %s", exc)
return 4
[docs]
@app.command("advect-replay")
def cmd_advect_replay(
agents: int = 100000,
frames: int = 60,
steps_per_frame: int = 5,
grid_res: int = 256,
device: str | None = None,
out_dir: str = str(Path("dashboard") / "data" / "advect_frames"),
stride: int = 10,
parquet: str | None = None,
fps: float = 10.0,
) -> int:
"""Generate replay frames for Streamlit dashboard."""
try:
import numpy as _np
from dashboard.utils.advect import (
AdvectFrame,
write_replay_frames,
write_replay_parquet,
)
from simulation import AdvectEngine
console.print(
Panel.fit(
"[bold cyan]Advect Replay Generator[/bold cyan]\n[dim]Generate JSON frames for Streamlit Replay Mode[/dim]",
border_style="bright_blue",
)
)
out_dir_path = Path(out_dir)
engine = AdvectEngine(
num_agents=int(agents), grid_res=int(grid_res), device=device
)
engine.generate_flow_field_curl(amplitude=1.0, frequency=3.0, phase=0.0)
produced: list[AdvectFrame] = []
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
console=console,
) as progress:
task = progress.add_task("[cyan]Simulating frames...", total=int(frames))
for _ in range(int(frames)):
engine.step(int(steps_per_frame))
pos = engine.positions_numpy()
pts = pos[:: max(1, int(stride))]
lon_min, lon_max = 103.60, 104.10
lat_min, lat_max = 1.20, 1.48
lon = lon_min + pts[:, 0] * (lon_max - lon_min)
lat = lat_min + pts[:, 1] * (lat_max - lat_min)
produced.append(
AdvectFrame(
lon=_np.asarray(lon, dtype=_np.float32),
lat=_np.asarray(lat, dtype=_np.float32),
)
)
progress.advance(task)
paths = write_replay_frames(produced, out_dir_path)
if parquet:
write_replay_parquet(produced, Path(parquet), fps=float(fps))
summary = Table(title="Replay Frames")
summary.add_column("Output")
summary.add_column("Count", justify="right")
summary.add_row(str(out_dir_path), str(len(paths)))
if parquet:
summary.add_row("parquet", str(parquet))
console.print(summary)
console.print(f"[green]✓[/green] Wrote frames to [cyan]{out_dir_path}[/cyan]")
return 0
except ImportError as exc:
console.print(
f"[red]{exc}[/red]\n[dim]Tip: install advect extra: pip install 'unsprawl[advect]'[/dim]"
)
return 2
except Exception as exc: # noqa: BLE001
console.print(f"[red]✗ advect-replay failed: {exc}[/red]")
logging.getLogger("advect-replay").exception("advect-replay failed: %s", exc)
return 4
[docs]
@app.command("valuate")
def cmd_valuate(
region: str = typer.Option("SG", help="Region code"),
input_path: str | None = typer.Option(None, "--input", help="Path to resale CSV"),
mrt_catalog: str | None = typer.Option(
None, help="MRT GeoJSON/CSV path (or blank to skip)"
),
transport_cache_dir: str | None = typer.Option(
None, help="Transport cache directory"
),
clear_transport_cache: bool = typer.Option(
False, help="Clear cache before scoring"
),
town: str | None = typer.Option(None, help="Exact town filter"),
town_like: str | None = typer.Option(None, help="Substring town filter"),
flat_type: str | None = typer.Option(None, help="Exact flat type"),
flat_type_like: str | None = typer.Option(None, help="Substring flat type"),
flat_model: str | None = typer.Option(None, help="Exact flat model"),
flat_model_like: str | None = typer.Option(None, help="Substring flat model"),
storey_min: int | None = typer.Option(None, help="Minimum storey"),
storey_max: int | None = typer.Option(None, help="Maximum storey"),
area_min: float | None = typer.Option(None, help="Minimum area"),
area_max: float | None = typer.Option(None, help="Maximum area"),
lease_min: float | None = typer.Option(None, help="Minimum lease years"),
lease_max: float | None = typer.Option(None, help="Maximum lease years"),
budget: float | None = typer.Option(None, help="Maximum price"),
group_by: str | None = typer.Option(
None, help="Group-by columns for z-scores (comma-separated)"
),
top: int = typer.Option(10, help="Top-N to display"),
no_accessibility_adjust: bool = typer.Option(
False, help="Do not adjust price_efficiency"
),
output: str | None = typer.Option(
None, help="Optional export path (csv/json/parquet)"
),
export_full: bool = typer.Option(False, help="Export full filtered dataset"),
output_format: str = typer.Option("csv", help="Export format: csv|json|parquet"),
) -> int:
"""Run property valuation and scoring pipeline."""
if region != "SG":
raise ValueError("Only --region SG is supported for valuate in this version")
appx = UnsprawlApp(schema=Schema(), transport_cache_dir=transport_cache_dir)
configure_logging(0)
try:
console.print(
Panel.fit(
f"[bold cyan]Unsprawl v{__version__}[/bold cyan]\n[dim]Valuation & Transport Scoring[/dim]",
border_style="bright_blue",
)
)
# Parse group_by: split comma-separated or space-separated string into list
group_by_list = None
if group_by:
# Support both comma-separated and space-separated
if "," in group_by:
group_by_list = [g.strip() for g in group_by.split(",") if g.strip()]
else:
group_by_list = [g.strip() for g in group_by.split() if g.strip()]
display_df = appx.process(
input_path=input_path,
mrt_catalog=mrt_catalog,
clear_transport_cache=clear_transport_cache,
group_by=group_by_list,
enable_accessibility_adjust=not no_accessibility_adjust,
town=town,
town_like=town_like,
budget=budget,
flat_type=flat_type,
flat_type_like=flat_type_like,
flat_model=flat_model,
flat_model_like=flat_model_like,
storey_min=storey_min,
storey_max=storey_max,
area_min=area_min,
area_max=area_max,
lease_min=lease_min,
lease_max=lease_max,
top_n=top,
return_full=False,
)
if len(display_df) > 0:
rich_table = appx.render_rich_table(
display_df.head(top).reset_index(drop=True),
title=f"🏠 Top {min(top, len(display_df))} Undervalued Residential Properties",
)
console.print(rich_table)
else:
console.print("[yellow]No properties found matching the criteria.[/yellow]")
# Optional export
if output:
export_df = (
display_df
if not export_full
else appx.reporter.generate_dataframe(
appx._data if appx._data is not None else pd.DataFrame(),
town=town,
town_like=town_like,
budget=budget,
flat_type=flat_type,
flat_type_like=flat_type_like,
flat_model=flat_model,
flat_model_like=flat_model_like,
storey_min=storey_min,
storey_max=storey_max,
area_min=area_min,
area_max=area_max,
lease_min=lease_min,
lease_max=lease_max,
top_n=top,
full=True,
)
)
fmt = (output_format or "csv").lower()
if fmt == "csv":
export_df.to_csv(output, index=False)
elif fmt == "json":
export_df.to_json(output, orient="records", lines=False)
else:
try:
export_df.to_parquet(output, index=False)
except Exception as parq_exc: # noqa: BLE001
console.print(
f"[yellow]⚠ Parquet export failed, falling back to CSV: {parq_exc}[/yellow]"
)
export_df.to_csv(output, index=False)
console.print(f"[green]✓[/green] Exported to [cyan]{output}[/cyan]")
return 0
except Exception as exc: # noqa: BLE001
console.print(f"[red]✗ Valuate failed: {exc}[/red]")
logging.getLogger("valuate").exception("Valuate failed: %s", exc)
return 4
[docs]
@app.command()
def showcase(
scenario: str = typer.Argument("genesis", help="Simulation scenario to load"),
dev: bool = typer.Option(True, help="Run in dev mode (hot reload)"),
):
"""
🚀 Launch the Full Stack Showcase (Backend + Frontend).
"""
console.print(f"[bold green]🚀 Launching Unsprawl: {scenario}[/bold green]")
# Check if web directory exists
web_dir = Path.cwd() / "web"
if not web_dir.exists():
console.print("[red]Error: 'web' directory not found. Please run from project root.[/red]")
raise typer.Exit(code=1)
# 1. Start Backend (Advect/API)
console.print("[yellow]Starting Backend (FastAPI + Warp)...[/yellow]")
backend_cmd = [
sys.executable, "-m", "uvicorn",
"unsprawl.api:app",
"--reload" if dev else "",
"--host", "0.0.0.0",
"--port", "8000"
]
# Filter empty strings
backend_cmd = [x for x in backend_cmd if x]
# We use Popen to keep it running
backend_proc = subprocess.Popen(backend_cmd)
# 2. Start Frontend (Vite)
console.print("[cyan]Starting Frontend (Vite)...[/cyan]")
frontend_proc = subprocess.Popen(
["npm", "run", "dev"],
cwd=web_dir
)
console.print("\n[bold]System Online.[/bold]")
console.print("Backend: http://localhost:8000")
console.print("Frontend: http://localhost:5173 (or 5174)\n")
console.print("[dim]Press Ctrl+C to stop all services...[/dim]")
try:
backend_proc.wait()
frontend_proc.wait()
except KeyboardInterrupt:
console.print("\n[red]Stopping services...[/red]")
backend_proc.terminate()
frontend_proc.terminate()
# Give them a moment to die gracefully
time.sleep(1)
if backend_proc.poll() is None:
backend_proc.kill()
if frontend_proc.poll() is None:
frontend_proc.kill()
sys.exit(0)
[docs]
def main(argv: list[str] | None = None) -> int:
"""Entry point that dispatches to Typer app.
Keeps return code semantics for tests, and supports legacy calls without a subcommand
by defaulting to the `valuate` command when argv starts with flags.
"""
# Normalize argv: default to valuate if first token is an option (legacy behavior)
commands = {
"dashboard",
"docs",
"cache",
"fetch",
"advect",
"advect-replay",
"valuate",
}
if argv:
first = str(argv[0])
if first.startswith("-") and not any(t in commands for t in argv[:1]):
argv = ["valuate", *argv]
# Bind Typer app alias locally to avoid any shadowing
app = _TY_APP
try:
# Allow passing argv for testing by temporarily patching sys.argv
if argv is not None:
from contextlib import ExitStack
with ExitStack():
old = sys.argv
sys.argv = [old[0]] + list(argv)
try:
app()
finally:
sys.argv = old
else:
app()
return 0
except SystemExit as e:
return int(getattr(e, "code", 0) or 0)