đ§ Contexte
Tu dĂ©clares une source dans dbt (par exemple : une table dans ton data warehouse), et tu veux surveiller si elle est fraĂźche, câest-Ă -dire :
- quâelle a bien Ă©tĂ© mise Ă jour rĂ©cemment
- quâelle respecte une frĂ©quence dâalimentation attendue
Mais parfois, une mĂȘme source est alimentĂ©e par plusieurs pipelines diffĂ©rents :
Exemple de table source | Processus de chargement | Fréquence |
---|---|---|
raw.transactions | ingestion par API temps réel | toutes les 15 minutes |
batch de consolidation backend | 1 fois par heure |
đŻ ProblĂšme : une seule dĂ©finition freshness
standard ne suffit pas
Si tu fais simplement :
freshness:
error_after: { count: 30, period: minute }
warn_after: { count: 15, period: minute }
â Tu appliques une rĂšgle unique Ă une source potentiellement multi-origine. Tu risques :
- soit de déclencher des alertes inutiles,
- soit de ne pas dĂ©tecter le vrai problĂšme (ex : si la partie “batch” est en retard mais que les donnĂ©es arrivent toujours cĂŽtĂ© API).
â Solution : plusieurs blocs freshness avec critĂšres diffĂ©rents
dbt permet de configurer plusieurs rĂšgles de fraĂźcheur (avec condition), dans une mĂȘme source, pour mieux reflĂ©ter la rĂ©alitĂ© mĂ©tier.
â ïž Attention : dbt ne supporte pas nativement plusieurs blocs freshness
par source, mais tu peux contourner cela intelligemment en :
- créant plusieurs tables
source
logiques qui pointent vers la mĂȘme table physique - ou en utilisant une macro customisĂ©e pour la validation conditionnelle selon lâheure, la partition ou lâenvironnement
đ§ Exemple 1 : dĂ©finir deux sources
logiques pour la mĂȘme table physique
version: 2
sources:
- name: app_sources
schema: raw
tables:
- name: transactions_api
identifier: transactions
freshness:
warn_after: { count: 15, period: minute }
error_after: { count: 30, period: minute }
loaded_at_field: updated_at
meta:
origin: api
- name: transactions_batch
identifier: transactions
freshness:
warn_after: { count: 1, period: hour }
error_after: { count: 2, period: hour }
loaded_at_field: updated_at
meta:
origin: batch
âĄïž Tu dĂ©clares deux entrĂ©es source
, mais elles pointent vers la mĂȘme table transactions
dans le warehouse.
đ§Ș Tu peux ensuite surveiller chaque “flux” :
dbt source freshness --select source:app_sources.transactions_api
dbt source freshness --select source:app_sources.transactions_batch
đ§© Exemple 2 : logique conditionnelle dans une macro
Si tu veux une logique dĂ©pendante de lâheure (ex : tolĂ©rance plus large la nuit), tu peux crĂ©er un test personnalisĂ© ou une macro :
{% macro validate_transactions_freshness(source_name, max_delay_minutes) %}
{% set now = modules.datetime.datetime.now() %}
{% set allowed_delay = modules.datetime.timedelta(minutes=max_delay_minutes) %}
{% set freshness = ... %} {# lire le MAX(updated_at) #}
{% if now - freshness > allowed_delay %}
{{ exceptions.raise_compiler_error("Freshness check failed for " ~ source_name) }}
{% endif %}
{% endmacro %}
Et tu appelles cette macro via un dbt run-operation
.
â Avantages de cette stratĂ©gie
Bénéfice | Détail |
---|---|
đŻ Suivi prĂ©cis de chaque flux de chargement | Tu sais si l’API ou le batch est en retard |
𧩠Une seule définition source réutilisable | Tu gardes la maintenance facile |
đ FlexibilitĂ© horaire ou mĂ©tier | Tu peux adapter la rĂšgle Ă lâheure, au type, Ă lâenv… |
đ Moins de fausses alertes | Chaque flux a sa propre tolĂ©rance de dĂ©lai |
đ§ RĂ©sumĂ©
ĂlĂ©ment | Explication |
---|---|
freshness: | Permet de vérifier si une table source est à jour |
Cas multi-flux | Tu dois surveiller chaque logique de chargement individuellement |
Solution 1 | Déclarer plusieurs entrées source avec identifier commun |
Solution 2 | Utiliser une macro conditionnelle ou personnalisée |
Avantage | Monitoring plus fin, détection plus fiable des incidents |