π― Why Create a Metrics Table for Test Results?
By default, dbt test
shows pass/fail results per run, but does not track trends over time.
Creating a metrics table allows you to:
- Persist test results (e.g., number of nulls, failed rows)
- Calculate rolling averages, trend lines, and SLA breaches
- Build dashboards or alerts from historical test performance
π§ Key Concepts
Concept | Description |
---|---|
dbt test results | Stored in target/run_results.json after each dbt test run |
Metrics ingestion | Parse this JSON and insert it into a historical table |
Metrics table | Stores test_id, model name, run time, failure counts, etc. |
Trend analysis | Use dbt models or dashboards to visualize long-term quality |
π¦ Step-by-Step Implementation
β Step 1: Create a metrics table
In models/monitoring/test_metrics.sql
:
{{ config(materialized='incremental', unique_key='test_run_id') }}
with raw_test_results as (
select * from {{ ref('stg_test_results') }}
),
filtered as (
select
test_run_id,
model_name,
test_name,
status,
failures,
execution_time,
run_started_at::timestamp as run_time
from raw_test_results
)
select * from filtered
This assumes you already staged your test results in a table called stg_test_results
.
β
Step 2: Create a Python script to parse run_results.json
and load it
In scripts/load_test_results.py
:
import json
import pandas as pd
from datetime import datetime
from sqlalchemy import create_engine
# Load run_results.json
with open('target/run_results.json') as f:
data = json.load(f)
records = []
for result in data['results']:
records.append({
"test_run_id": data['metadata']['invocation_id'],
"model_name": result['node']['name'],
"test_name": result['unique_id'],
"status": result['status'],
"failures": result.get('failures', 0),
"execution_time": result.get('execution_time', 0.0),
"run_started_at": data['metadata']['generated_at']
})
df = pd.DataFrame(records)
# Load into staging table (replace with your actual warehouse connection string)
engine = create_engine('postgresql://user:pass@host:port/db')
df.to_sql('stg_test_results', engine, if_exists='append', index=False)
β Step 3: Run your pipeline
dbt test
python scripts/load_test_results.py
dbt run --select test_metrics
β Step 4: Use your metrics table to analyze trends
Example: models/monitoring/test_failure_trends.sql
select
model_name,
test_name,
date_trunc('day', run_time) as day,
count(*) filter (where failures > 0) as failure_count,
count(*) as total_runs,
avg(failures) as avg_failures
from {{ ref('test_metrics') }}
group by 1, 2, 3
You can visualize this in dbt Cloud, Metabase, Superset, or Looker.
π‘ Example Use Cases
Use Case | Enabled by metrics table? |
---|---|
π§ Track % of passing tests daily | β |
π Identify models with rising failure rate | β |
β± Detect slow-running tests | β |
π¨ Trigger alerts if failures > threshold | β |
π Create data quality dashboards | β |
β Summary
run_results.json
contains rich test data β but only for the latest run- Parsing and persisting it into a historical metrics table unlocks:
- Trend analysis
- SLA tracking
- Proactive monitoring
- This pattern is adapter-agnostic and works with all dbt projects