✅ Le vrai besoin : des seuils dynamiques en fonction du moment ou du contexte
Le paragraphe parle d’une stratégie où :
- on n’applique pas un seul
warn_after
/error_after
- mais on adapte les seuils selon les heures, les jours, ou des patterns historiques
🚫 Limite de dbt natif
Dans schema.yml
, tu écris :
freshness:
warn_after: { count: 30, period: minute }
Mais ici, tu ne peux pas mettre de logique conditionnelle, ni utiliser {{ jinja }}
.
Donc : ce n’est pas dynamique dans le fichier schema.yml
.
✅ Solution réaliste : Monitoring extérieur + logique dynamique personnalisée
Tu peux :
🧱 Étape 1 – Utiliser dbt source freshness
pour collecter la fraîcheur réelle
dbt source freshness --select source:raw.ventes --output json
Ce JSON te dit :
{
"status": "warn",
"max_loaded_at": "2024-06-17T08:05:00",
"snapshotted_at": "2024-06-17T08:30:00",
"age": "00:25:00"
}
➡️ Donc tu connais la fraîcheur réelle
🧱 Étape 2 – Créer une table source_freshness_log
dans ton entrepôt
Tu y stockes :
- le nom de la source
age
(fraîcheur)timestamp
- et un seuil dynamique selon l’heure
🧠 Étape 3 – Appliquer une logique comme :
SELECT
source_name,
freshness_age_minutes,
CASE
WHEN current_time BETWEEN '08:00:00' AND '18:00:00' THEN 15
WHEN current_time BETWEEN '18:00:00' AND '00:00:00' THEN 60
ELSE 360
END AS freshness_threshold_minutes,
CASE
WHEN freshness_age_minutes > freshness_threshold_minutes THEN 'LATE'
ELSE 'OK'
END AS freshness_status
FROM freshness_log;
✅ Ici, le seuil dépend de l’heure de la journée
🧠 Variante possible : passer par Airflow / dbt Cloud scheduler
Tu peux aussi :
- créer un job
dbt source freshness
toutes les heures - stocker les résultats dans BigQuery / Snowflake
- comparer avec des seuils calculés dynamiquement dans un modèle dbt
- déclencher une alerte (Slack, email) uniquement si une vraie anomalie est détectée
✅ Résumé réaliste
Solution | Dynamique ? | dbt natif ? |
---|---|---|
freshness: dans schema.yml | ❌ Non | ✅ Oui |
JSON de dbt source freshness + post-traitement SQL | ✅ Oui | ✅ Avec un peu de boulot |
Airflow/Cloud + table de logs | ✅ Oui | 🔁 Intégré au monitoring global |
Exemple
dbt source freshness
- un script Python pour parser le JSON et insérer dans une table
- un modèle dbt
monitor_freshness.sql
pour appliquer la logique dynamique - un orchestrateur type Airflow ou cron (facultatif)
🧱 1. Fichier schema.yml
(pour activer le suivi de fraîcheur)
version: 2
sources:
- name: raw
schema: raw_data
tables:
- name: ventes
description: "Données de ventes brutes"
loaded_at_field: date_chargement
Tu n’es pas obligé de définir
freshness:
ici si tu fais ton suivi dynamiquement ensuite.
🛠️ 2. Script Python pour exécuter dbt et charger les résultats
import subprocess
import json
import datetime
import pandas as pd
from google.cloud import bigquery # ou snowflake.connector, selon ton DWH
def run_dbt_source_freshness():
subprocess.run(["dbt", "source", "freshness", "--output", "json", "--target", "prod"])
with open("target/manifest.json") as f:
manifest = json.load(f)
with open("target/freshness_output.json") as f:
freshness_data = json.load(f)
return freshness_data
def extract_freshness_rows(freshness_data):
rows = []
for source in freshness_data["results"]:
rows.append({
"source_name": source["unique_id"].split(".")[-1],
"max_loaded_at": source["max_loaded_at"],
"snapshotted_at": source["snapshotted_at"],
"age_minutes": source["age"],
"status": source["status"],
"load_time": datetime.datetime.now()
})
return pd.DataFrame(rows)
def load_to_bigquery(df: pd.DataFrame):
client = bigquery.Client()
table_id = "mon_projet.monitoring.source_freshness_log"
job = client.load_table_from_dataframe(df, table_id, job_config=bigquery.LoadJobConfig(write_disposition="WRITE_APPEND"))
job.result()
if __name__ == "__main__":
data = run_dbt_source_freshness()
df = extract_freshness_rows(data)
load_to_bigquery(df)
🧠 Ce script :
- exécute
dbt source freshness
- extrait les infos (
age
,status
,snapshotted_at
, etc.) - les charge dans une table
source_freshness_log
🧪 3. Modèle dbt monitor_freshness.sql
(analyse dynamique)
{{ config(materialized="table") }}
WITH last_freshness AS (
SELECT *
FROM {{ source('monitoring', 'source_freshness_log') }}
QUALIFY ROW_NUMBER() OVER (PARTITION BY source_name ORDER BY load_time DESC) = 1
)
SELECT
source_name,
age_minutes,
CURRENT_TIME() AS heure_analyse,
CASE
WHEN CURRENT_TIME() BETWEEN TIME('08:00:00') AND TIME('18:00:00') THEN 15
WHEN CURRENT_TIME() BETWEEN TIME('18:00:00') AND TIME('00:00:00') THEN 60
ELSE 360
END AS seuil_minutes,
CASE
WHEN age_minutes >
CASE
WHEN CURRENT_TIME() BETWEEN TIME('08:00:00') AND TIME('18:00:00') THEN 15
WHEN CURRENT_TIME() BETWEEN TIME('18:00:00') AND TIME('00:00:00') THEN 60
ELSE 360
END
THEN 'LATE'
ELSE 'OK'
END AS statut_fraicheur
FROM last_freshness
✅ Tu obtiens une table avec :source_name | age_minutes | heure_analyse | seuil_minutes | statut_fraicheur
📆 4. Orchestration (Airflow ou cron)
- Planifie un DAG ou une tâche cron qui :
- Exécute le script Python (freshness + BQ insert)
- Lance
dbt run --select monitor_freshness
- Facultatif : envoie une alerte Slack si
statut_fraicheur = 'LATE'
✅ Résumé du pipeline
Étape | Action |
---|---|
dbt source freshness | Extrait la fraîcheur réelle |
Script Python | Stocke dans source_freshness_log |
Modèle dbt monitor_freshness | Applique des seuils dynamiques |
Résultat | Table avec statut temps réel et conditionnel |
Bonus | Alertes si LATE , historisation possible |