OpenLineage integration for SQLMesh. Automatically emits lineage events to Marquez or any OpenLineage-compatible backend.
- Table-level lineage: Track which models depend on which upstream models
- Column-level lineage: Track which columns flow from source to destination
- Schema capture: Column names and types for each model
- Execution stats: Duration, rows processed, bytes processed
- Per-model events: START/COMPLETE/FAIL events for each model evaluation
pip install sqlmesh-openlineageOr with uv:
uv add sqlmesh-openlineageNote: This package requires Python-based SQLMesh configuration (config.py), not YAML configuration.
Add this to your config.py:
import sqlmesh_openlineage
sqlmesh_openlineage.install(
url="http://localhost:5000",
namespace="my_project",
# api_key="...", # optional
)
from sqlmesh.core.config import Config
config = Config(
# ... your existing config
)Then run sqlmesh run as normal. OpenLineage events will be emitted for each model evaluation.
You can also configure via environment variables:
export OPENLINEAGE_URL=http://localhost:5000
export OPENLINEAGE_NAMESPACE=my_project
export OPENLINEAGE_API_KEY=... # optionalThen in config.py:
import sqlmesh_openlineage
sqlmesh_openlineage.install() # reads from env varsThis package uses SQLMesh's set_console() API to inject a custom Console wrapper. The wrapper intercepts per-snapshot lifecycle events and emits corresponding OpenLineage events:
STARTevent when a model evaluation beginsCOMPLETEevent when evaluation succeeds (includes execution stats)FAILevent when evaluation fails or audits fail
| SQLMesh Event | OpenLineage Event | Data Included |
|---|---|---|
| Model evaluation start | RunEvent(START) | Input datasets, output dataset with schema, column lineage |
| Model evaluation success | RunEvent(COMPLETE) | Execution stats (rows, bytes, duration) |
| Model evaluation failure | RunEvent(FAIL) | Error message |
| Audit failure | RunEvent(FAIL) | Audit failure details |
The integration automatically extracts column-level lineage using SQLMesh's built-in lineage analysis. For example, if you have:
-- customers.sql
SELECT customer_id, name, email FROM raw_customers
-- customer_summary.sql
SELECT
c.customer_id,
c.name as customer_name,
COUNT(o.order_id) as total_orders
FROM customers c
LEFT JOIN orders o ON c.customer_id = o.customer_id
GROUP BY c.customer_id, c.nameThe lineage will show that customer_summary.customer_name traces back to customers.name.
# Start Marquez (requires Docker)
docker compose up -d
# Configure and run SQLMesh
export OPENLINEAGE_URL=http://localhost:5001
sqlmesh run
# View lineage at http://localhost:3000# Install dependencies
uv sync --dev
# Run tests (unit + integration)
uv run pytest tests/ -v
# Run Marquez integration test (requires Docker)
docker compose up -d
uv run pytest tests/test_marquez_integration.py -v -s
docker compose downMIT