dbt: Freshness – Pattern-based monitoring with dynamic thresholds


✅ Le vrai besoin : des seuils dynamiques en fonction du moment ou du contexte

Le paragraphe parle d’une stratégie où :

  • on n’applique pas un seul warn_after/error_after
  • mais on adapte les seuils selon les heures, les jours, ou des patterns historiques

🚫 Limite de dbt natif

Dans schema.yml, tu écris :

freshness:
  warn_after: { count: 30, period: minute }

Mais ici, tu ne peux pas mettre de logique conditionnelle, ni utiliser {{ jinja }}.
Donc : ce n’est pas dynamique dans le fichier schema.yml.


Solution réaliste : Monitoring extérieur + logique dynamique personnalisée

Tu peux :

🧱 Étape 1 – Utiliser dbt source freshness pour collecter la fraîcheur réelle

dbt source freshness --select source:raw.ventes --output json

Ce JSON te dit :

{
  "status": "warn",
  "max_loaded_at": "2024-06-17T08:05:00",
  "snapshotted_at": "2024-06-17T08:30:00",
  "age": "00:25:00"
}

➡️ Donc tu connais la fraîcheur réelle


🧱 Étape 2 – Créer une table source_freshness_log dans ton entrepôt

Tu y stockes :

  • le nom de la source
  • age (fraîcheur)
  • timestamp
  • et un seuil dynamique selon l’heure

🧠 Étape 3 – Appliquer une logique comme :

SELECT
  source_name,
  freshness_age_minutes,
  CASE
    WHEN current_time BETWEEN '08:00:00' AND '18:00:00' THEN 15
    WHEN current_time BETWEEN '18:00:00' AND '00:00:00' THEN 60
    ELSE 360
  END AS freshness_threshold_minutes,
  CASE
    WHEN freshness_age_minutes > freshness_threshold_minutes THEN 'LATE'
    ELSE 'OK'
  END AS freshness_status
FROM freshness_log;

✅ Ici, le seuil dépend de l’heure de la journée


🧠 Variante possible : passer par Airflow / dbt Cloud scheduler

Tu peux aussi :

  • créer un job dbt source freshness toutes les heures
  • stocker les résultats dans BigQuery / Snowflake
  • comparer avec des seuils calculés dynamiquement dans un modèle dbt
  • déclencher une alerte (Slack, email) uniquement si une vraie anomalie est détectée

✅ Résumé réaliste

SolutionDynamique ?dbt natif ?
freshness: dans schema.yml❌ Non✅ Oui
JSON de dbt source freshness + post-traitement SQL✅ Oui✅ Avec un peu de boulot
Airflow/Cloud + table de logs✅ Oui🔁 Intégré au monitoring global

Exemple

  • dbt source freshness
  • un script Python pour parser le JSON et insérer dans une table
  • un modèle dbt monitor_freshness.sql pour appliquer la logique dynamique
  • un orchestrateur type Airflow ou cron (facultatif)

🧱 1. Fichier schema.yml (pour activer le suivi de fraîcheur)

version: 2

sources:
  - name: raw
    schema: raw_data
    tables:
      - name: ventes
        description: "Données de ventes brutes"
        loaded_at_field: date_chargement

Tu n’es pas obligé de définir freshness: ici si tu fais ton suivi dynamiquement ensuite.


🛠️ 2. Script Python pour exécuter dbt et charger les résultats

import subprocess
import json
import datetime
import pandas as pd
from google.cloud import bigquery  # ou snowflake.connector, selon ton DWH

def run_dbt_source_freshness():
    subprocess.run(["dbt", "source", "freshness", "--output", "json", "--target", "prod"])
    with open("target/manifest.json") as f:
        manifest = json.load(f)
    with open("target/freshness_output.json") as f:
        freshness_data = json.load(f)
    return freshness_data

def extract_freshness_rows(freshness_data):
    rows = []
    for source in freshness_data["results"]:
        rows.append({
            "source_name": source["unique_id"].split(".")[-1],
            "max_loaded_at": source["max_loaded_at"],
            "snapshotted_at": source["snapshotted_at"],
            "age_minutes": source["age"],
            "status": source["status"],
            "load_time": datetime.datetime.now()
        })
    return pd.DataFrame(rows)

def load_to_bigquery(df: pd.DataFrame):
    client = bigquery.Client()
    table_id = "mon_projet.monitoring.source_freshness_log"
    job = client.load_table_from_dataframe(df, table_id, job_config=bigquery.LoadJobConfig(write_disposition="WRITE_APPEND"))
    job.result()

if __name__ == "__main__":
    data = run_dbt_source_freshness()
    df = extract_freshness_rows(data)
    load_to_bigquery(df)

🧠 Ce script :

  • exécute dbt source freshness
  • extrait les infos (age, status, snapshotted_at, etc.)
  • les charge dans une table source_freshness_log

🧪 3. Modèle dbt monitor_freshness.sql (analyse dynamique)

{{ config(materialized="table") }}

WITH last_freshness AS (
  SELECT *
  FROM {{ source('monitoring', 'source_freshness_log') }}
  QUALIFY ROW_NUMBER() OVER (PARTITION BY source_name ORDER BY load_time DESC) = 1
)

SELECT
  source_name,
  age_minutes,
  CURRENT_TIME() AS heure_analyse,
  CASE
    WHEN CURRENT_TIME() BETWEEN TIME('08:00:00') AND TIME('18:00:00') THEN 15
    WHEN CURRENT_TIME() BETWEEN TIME('18:00:00') AND TIME('00:00:00') THEN 60
    ELSE 360
  END AS seuil_minutes,
  CASE
    WHEN age_minutes > 
      CASE
        WHEN CURRENT_TIME() BETWEEN TIME('08:00:00') AND TIME('18:00:00') THEN 15
        WHEN CURRENT_TIME() BETWEEN TIME('18:00:00') AND TIME('00:00:00') THEN 60
        ELSE 360
      END
    THEN 'LATE'
    ELSE 'OK'
  END AS statut_fraicheur
FROM last_freshness

✅ Tu obtiens une table avec :
source_name | age_minutes | heure_analyse | seuil_minutes | statut_fraicheur


📆 4. Orchestration (Airflow ou cron)

  • Planifie un DAG ou une tâche cron qui :
    1. Exécute le script Python (freshness + BQ insert)
    2. Lance dbt run --select monitor_freshness
    3. Facultatif : envoie une alerte Slack si statut_fraicheur = 'LATE'

✅ Résumé du pipeline

ÉtapeAction
dbt source freshnessExtrait la fraîcheur réelle
Script PythonStocke dans source_freshness_log
Modèle dbt monitor_freshnessApplique des seuils dynamiques
RésultatTable avec statut temps réel et conditionnel
BonusAlertes si LATE, historisation possible

Leave a Reply

Your email address will not be published. Required fields are marked *