"""Autonomous ETL Agent using Gemini Thinking Mode + Code Execution.
This module implements the RefineryAgent that ingests raw CSV data,
uses Gemini's thinking capabilities to reason about schema mappings,
and generates Python transformation code via Code Execution.
Example
-------
>>> from unsprawl.refinery import RefineryAgent
>>> from pydantic import BaseModel
>>>
>>> class PropertyRecord(BaseModel):
... address: str
... price: float
... area_sqm: float
>>>
>>> agent = RefineryAgent()
>>> records = await agent.ingest(
... "https://example.com/raw_data.csv",
... PropertyRecord
... )
"""
from __future__ import annotations
import json
import logging
import os
from typing import Any, TypeVar, cast
import google.genai as genai
from google.genai import types
from pydantic import BaseModel
logger = logging.getLogger(__name__)
T = TypeVar("T", bound=BaseModel)
[docs]
class RefineryAgent:
"""Autonomous ETL agent that transforms raw CSV to structured Pydantic models.
Uses Gemini's Thinking Mode for deep reasoning about column mappings
and Code Execution to run the generated transformation scripts.
Parameters
----------
model : str
The Gemini model to use. Defaults to gemini-2.5-flash for thinking support.
api_key : str | None
Google API key. If None, reads from GOOGLE_API_KEY environment variable.
thinking_budget : int
Token budget for thinking. Higher values enable deeper reasoning.
Notes
-----
- Requires `google-genai` SDK (NOT `google.generativeai`)
- Code Execution runs in a sandboxed environment
"""
def __init__(
self,
model: str = "gemini-2.5-flash",
api_key: str | None = None,
thinking_budget: int = 8192,
) -> None:
self.model = model
self.thinking_budget = thinking_budget
# Initialize the client
api_key = api_key or os.environ.get("GOOGLE_API_KEY")
if not api_key:
raise ValueError(
"API key required. Set GOOGLE_API_KEY environment variable "
"or pass api_key parameter."
)
self.client = genai.Client(api_key=api_key)
[docs]
def _schema_to_prompt(self, schema_cls: type[BaseModel]) -> str:
"""Convert a Pydantic model to a prompt-friendly schema description."""
schema = schema_cls.model_json_schema()
return json.dumps(schema, indent=2)
[docs]
async def ingest(
self,
file_url: str,
target_schema: type[T],
) -> list[dict[str, Any]]:
"""Ingest a CSV URL and transform it to match the target schema.
Parameters
----------
file_url : str
URL to the raw CSV file to ingest.
target_schema : type[BaseModel]
Pydantic model defining the target data structure.
Returns
-------
list[dict[str, Any]]
List of records conforming to the target schema.
Raises
------
RuntimeError
If code execution fails or returns invalid data.
"""
schema_prompt = self._schema_to_prompt(target_schema)
prompt = f"""You are a data engineering expert. Your task is to:
1. Download the CSV from: {file_url}
2. Analyze the column names and data types
3. Map them to this target schema:
```json
{schema_prompt}
```
4. Write a Python script that:
- Downloads the CSV using requests
- Parses it with pandas
- Transforms/renames columns to match the target schema
- Handles missing values appropriately
- Returns the cleaned data as a JSON array
Think deeply about:
- Column name variations (e.g., "sqft" vs "area_sqm")
- Unit conversions if needed
- Data type coercions
- Null/missing value strategies
After reasoning, execute the transformation code and return ONLY the JSON array.
"""
# Configure with Thinking Mode and Code Execution
config = types.GenerateContentConfig(
thinking_config=types.ThinkingConfig(
thinking_budget=self.thinking_budget,
),
tools=[
types.Tool(code_execution=types.ToolCodeExecution()),
],
)
logger.info(f"Starting ingestion from {file_url}")
response = await self.client.aio.models.generate_content(
model=self.model,
contents=prompt,
config=config,
)
# Extract the result from the response
if not response.candidates:
raise RuntimeError("No response candidates returned from Gemini")
# Look for code execution results
result_text = ""
for part in response.candidates[0].content.parts:
if hasattr(part, "executable_code"):
logger.debug(f"Executed code: {part.executable_code.code[:200]}...")
if hasattr(part, "code_execution_result"):
result_text = part.code_execution_result.output
logger.info("Code execution completed successfully")
elif hasattr(part, "text") and part.text:
result_text = part.text
if not result_text:
raise RuntimeError("No execution result or text returned")
# Parse the JSON result
try:
# Try to extract JSON array from the response
# Handle cases where the response contains markdown code blocks
text = result_text.strip()
if text.startswith("```"):
# Extract content between code fences
lines = text.split("\n")
json_lines = []
in_block = False
for line in lines:
if line.startswith("```") and not in_block:
in_block = True
continue
if line.startswith("```") and in_block:
break
if in_block:
json_lines.append(line)
text = "\n".join(json_lines)
records = json.loads(text)
if not isinstance(records, list):
records = [records]
logger.info(f"Successfully parsed {len(records)} records")
return cast(list[dict[str, Any]], records)
except json.JSONDecodeError as e:
logger.error(f"Failed to parse JSON: {e}")
logger.error(f"Raw output: {result_text[:500]}")
raise RuntimeError(f"Failed to parse transformation result: {e}") from e
[docs]
async def ingest_and_validate(
self,
file_url: str,
target_schema: type[T],
) -> list[T]:
"""Ingest and validate records against the Pydantic schema.
Parameters
----------
file_url : str
URL to the raw CSV file to ingest.
target_schema : type[T]
Pydantic model defining the target data structure.
Returns
-------
list[T]
List of validated Pydantic model instances.
"""
records = await self.ingest(file_url, target_schema)
validated = []
for record in records:
try:
validated.append(target_schema.model_validate(record))
except Exception as e:
logger.warning(f"Validation failed for record: {e}")
[docs]
def convert_cityjson_to_deckgl(self, input_path: str, output_path: str) -> None:
"""Convert CityJSON to DeckGL-compatible GeoJSON."""
from cjio import cityjson
import json
print(f"🏗️ REFINERY: Ingesting 3D Model {input_path}...")
# 1. Load the CityJSON
with open(input_path, 'r') as f:
cm = cityjson.CityJSON(file=f)
# Force EPSG:3414 and Reproject
if "metadata" not in cm.j:
cm.j["metadata"] = {}
cm.set_epsg(3414)
print("Reprojecting to EPSG:4326...")
try:
cm.reproject(4326)
except Exception as e:
print(f"Reprojection failed (proceeding): {e}")
# 2. Extract Features (Manual Conversion)
features = []
vertices = cm.j["vertices"]
def parse_boundary(b: list[int]) -> list[list[float]]:
ring = [vertices[i] for i in b]
if ring and ring[0] != ring[-1]:
ring.append(ring[0])
return ring
for obj_id, obj in cm.j.get("CityObjects", {}).items():
geom = obj.get("geometry", [])
for g in geom:
gtype = g["type"]
boundaries = g["boundaries"]
polygons = []
if gtype == "Solid":
# boundaries[shell][surface][ring]
if len(boundaries) > 0:
shell = boundaries[0]
for surface in shell:
polygons.append([parse_boundary(ring) for ring in surface])
elif gtype == "MultiSurface" or gtype == "CompositeSurface":
# boundaries[surface][ring]
for surface in boundaries:
polygons.append([parse_boundary(ring) for ring in surface])
for poly in polygons:
features.append({
"type": "Feature",
"properties": {
"id": obj_id,
"obj_type": obj["type"],
**obj.get("attributes", {})
},
"geometry": {"type": "Polygon", "coordinates": poly}
})
# 3. Save
import os
os.makedirs(os.path.dirname(output_path), exist_ok=True)
with open(output_path, 'w') as f:
json.dump({"type": "FeatureCollection", "features": features}, f)
print(f"✅ CONVERSION COMPLETE: Saved to {output_path}")