Source code for unsprawl.refinery

"""Autonomous ETL Agent using Gemini Thinking Mode + Code Execution.

This module implements the RefineryAgent that ingests raw CSV data,
uses Gemini's thinking capabilities to reason about schema mappings,
and generates Python transformation code via Code Execution.

Example
-------
>>> from unsprawl.refinery import RefineryAgent
>>> from pydantic import BaseModel
>>>
>>> class PropertyRecord(BaseModel):
...     address: str
...     price: float
...     area_sqm: float
>>>
>>> agent = RefineryAgent()
>>> records = await agent.ingest(
...     "https://example.com/raw_data.csv",
...     PropertyRecord
... )
"""

from __future__ import annotations

import json
import logging
import os
from typing import Any, TypeVar, cast

import google.genai as genai
from google.genai import types
from pydantic import BaseModel

logger = logging.getLogger(__name__)

T = TypeVar("T", bound=BaseModel)


[docs] class RefineryAgent: """Autonomous ETL agent that transforms raw CSV to structured Pydantic models. Uses Gemini's Thinking Mode for deep reasoning about column mappings and Code Execution to run the generated transformation scripts. Parameters ---------- model : str The Gemini model to use. Defaults to gemini-2.5-flash for thinking support. api_key : str | None Google API key. If None, reads from GOOGLE_API_KEY environment variable. thinking_budget : int Token budget for thinking. Higher values enable deeper reasoning. Notes ----- - Requires `google-genai` SDK (NOT `google.generativeai`) - Code Execution runs in a sandboxed environment """ def __init__( self, model: str = "gemini-2.5-flash", api_key: str | None = None, thinking_budget: int = 8192, ) -> None: self.model = model self.thinking_budget = thinking_budget # Initialize the client api_key = api_key or os.environ.get("GOOGLE_API_KEY") if not api_key: raise ValueError( "API key required. Set GOOGLE_API_KEY environment variable " "or pass api_key parameter." ) self.client = genai.Client(api_key=api_key)
[docs] def _schema_to_prompt(self, schema_cls: type[BaseModel]) -> str: """Convert a Pydantic model to a prompt-friendly schema description.""" schema = schema_cls.model_json_schema() return json.dumps(schema, indent=2)
[docs] async def ingest( self, file_url: str, target_schema: type[T], ) -> list[dict[str, Any]]: """Ingest a CSV URL and transform it to match the target schema. Parameters ---------- file_url : str URL to the raw CSV file to ingest. target_schema : type[BaseModel] Pydantic model defining the target data structure. Returns ------- list[dict[str, Any]] List of records conforming to the target schema. Raises ------ RuntimeError If code execution fails or returns invalid data. """ schema_prompt = self._schema_to_prompt(target_schema) prompt = f"""You are a data engineering expert. Your task is to: 1. Download the CSV from: {file_url} 2. Analyze the column names and data types 3. Map them to this target schema: ```json {schema_prompt} ``` 4. Write a Python script that: - Downloads the CSV using requests - Parses it with pandas - Transforms/renames columns to match the target schema - Handles missing values appropriately - Returns the cleaned data as a JSON array Think deeply about: - Column name variations (e.g., "sqft" vs "area_sqm") - Unit conversions if needed - Data type coercions - Null/missing value strategies After reasoning, execute the transformation code and return ONLY the JSON array. """ # Configure with Thinking Mode and Code Execution config = types.GenerateContentConfig( thinking_config=types.ThinkingConfig( thinking_budget=self.thinking_budget, ), tools=[ types.Tool(code_execution=types.ToolCodeExecution()), ], ) logger.info(f"Starting ingestion from {file_url}") response = await self.client.aio.models.generate_content( model=self.model, contents=prompt, config=config, ) # Extract the result from the response if not response.candidates: raise RuntimeError("No response candidates returned from Gemini") # Look for code execution results result_text = "" for part in response.candidates[0].content.parts: if hasattr(part, "executable_code"): logger.debug(f"Executed code: {part.executable_code.code[:200]}...") if hasattr(part, "code_execution_result"): result_text = part.code_execution_result.output logger.info("Code execution completed successfully") elif hasattr(part, "text") and part.text: result_text = part.text if not result_text: raise RuntimeError("No execution result or text returned") # Parse the JSON result try: # Try to extract JSON array from the response # Handle cases where the response contains markdown code blocks text = result_text.strip() if text.startswith("```"): # Extract content between code fences lines = text.split("\n") json_lines = [] in_block = False for line in lines: if line.startswith("```") and not in_block: in_block = True continue if line.startswith("```") and in_block: break if in_block: json_lines.append(line) text = "\n".join(json_lines) records = json.loads(text) if not isinstance(records, list): records = [records] logger.info(f"Successfully parsed {len(records)} records") return cast(list[dict[str, Any]], records) except json.JSONDecodeError as e: logger.error(f"Failed to parse JSON: {e}") logger.error(f"Raw output: {result_text[:500]}") raise RuntimeError(f"Failed to parse transformation result: {e}") from e
[docs] async def ingest_and_validate( self, file_url: str, target_schema: type[T], ) -> list[T]: """Ingest and validate records against the Pydantic schema. Parameters ---------- file_url : str URL to the raw CSV file to ingest. target_schema : type[T] Pydantic model defining the target data structure. Returns ------- list[T] List of validated Pydantic model instances. """ records = await self.ingest(file_url, target_schema) validated = [] for record in records: try: validated.append(target_schema.model_validate(record)) except Exception as e: logger.warning(f"Validation failed for record: {e}")
[docs] def convert_cityjson_to_deckgl(self, input_path: str, output_path: str) -> None: """Convert CityJSON to DeckGL-compatible GeoJSON.""" from cjio import cityjson import json print(f"🏗️ REFINERY: Ingesting 3D Model {input_path}...") # 1. Load the CityJSON with open(input_path, 'r') as f: cm = cityjson.CityJSON(file=f) # Force EPSG:3414 and Reproject if "metadata" not in cm.j: cm.j["metadata"] = {} cm.set_epsg(3414) print("Reprojecting to EPSG:4326...") try: cm.reproject(4326) except Exception as e: print(f"Reprojection failed (proceeding): {e}") # 2. Extract Features (Manual Conversion) features = [] vertices = cm.j["vertices"] def parse_boundary(b: list[int]) -> list[list[float]]: ring = [vertices[i] for i in b] if ring and ring[0] != ring[-1]: ring.append(ring[0]) return ring for obj_id, obj in cm.j.get("CityObjects", {}).items(): geom = obj.get("geometry", []) for g in geom: gtype = g["type"] boundaries = g["boundaries"] polygons = [] if gtype == "Solid": # boundaries[shell][surface][ring] if len(boundaries) > 0: shell = boundaries[0] for surface in shell: polygons.append([parse_boundary(ring) for ring in surface]) elif gtype == "MultiSurface" or gtype == "CompositeSurface": # boundaries[surface][ring] for surface in boundaries: polygons.append([parse_boundary(ring) for ring in surface]) for poly in polygons: features.append({ "type": "Feature", "properties": { "id": obj_id, "obj_type": obj["type"], **obj.get("attributes", {}) }, "geometry": {"type": "Polygon", "coordinates": poly} }) # 3. Save import os os.makedirs(os.path.dirname(output_path), exist_ok=True) with open(output_path, 'w') as f: json.dump({"type": "FeatureCollection", "features": features}, f) print(f"✅ CONVERSION COMPLETE: Saved to {output_path}")