dbt: how to implement a metrics table to store test results


🎯 Why Create a Metrics Table for Test Results?

By default, dbt test shows pass/fail results per run, but does not track trends over time.

Creating a metrics table allows you to:

  • Persist test results (e.g., number of nulls, failed rows)
  • Calculate rolling averages, trend lines, and SLA breaches
  • Build dashboards or alerts from historical test performance

🧠 Key Concepts

ConceptDescription
dbt test resultsStored in target/run_results.json after each dbt test run
Metrics ingestionParse this JSON and insert it into a historical table
Metrics tableStores test_id, model name, run time, failure counts, etc.
Trend analysisUse dbt models or dashboards to visualize long-term quality

πŸ“¦ Step-by-Step Implementation


βœ… Step 1: Create a metrics table

In models/monitoring/test_metrics.sql:

{{ config(materialized='incremental', unique_key='test_run_id') }}

with raw_test_results as (

  select * from {{ ref('stg_test_results') }}

),

filtered as (

  select
    test_run_id,
    model_name,
    test_name,
    status,
    failures,
    execution_time,
    run_started_at::timestamp as run_time
  from raw_test_results

)

select * from filtered

This assumes you already staged your test results in a table called stg_test_results.


βœ… Step 2: Create a Python script to parse run_results.json and load it

In scripts/load_test_results.py:

import json
import pandas as pd
from datetime import datetime
from sqlalchemy import create_engine

# Load run_results.json
with open('target/run_results.json') as f:
    data = json.load(f)

records = []
for result in data['results']:
    records.append({
        "test_run_id": data['metadata']['invocation_id'],
        "model_name": result['node']['name'],
        "test_name": result['unique_id'],
        "status": result['status'],
        "failures": result.get('failures', 0),
        "execution_time": result.get('execution_time', 0.0),
        "run_started_at": data['metadata']['generated_at']
    })

df = pd.DataFrame(records)

# Load into staging table (replace with your actual warehouse connection string)
engine = create_engine('postgresql://user:pass@host:port/db')
df.to_sql('stg_test_results', engine, if_exists='append', index=False)

βœ… Step 3: Run your pipeline

dbt test
python scripts/load_test_results.py
dbt run --select test_metrics

βœ… Step 4: Use your metrics table to analyze trends

Example: models/monitoring/test_failure_trends.sql

select
  model_name,
  test_name,
  date_trunc('day', run_time) as day,
  count(*) filter (where failures > 0) as failure_count,
  count(*) as total_runs,
  avg(failures) as avg_failures
from {{ ref('test_metrics') }}
group by 1, 2, 3

You can visualize this in dbt Cloud, Metabase, Superset, or Looker.


πŸ›‘ Example Use Cases

Use CaseEnabled by metrics table?
🧭 Track % of passing tests dailyβœ…
πŸ“‰ Identify models with rising failure rateβœ…
⏱ Detect slow-running testsβœ…
🚨 Trigger alerts if failures > thresholdβœ…
πŸ“Š Create data quality dashboardsβœ…

βœ… Summary

  • run_results.json contains rich test data β€” but only for the latest run
  • Parsing and persisting it into a historical metrics table unlocks:
    • Trend analysis
    • SLA tracking
    • Proactive monitoring
  • This pattern is adapter-agnostic and works with all dbt projects

Leave a Reply

Your email address will not be published. Required fields are marked *