En anglais : “Schema validation with strict required fields and fallback handling”
🧠 Problème à résoudre
- Tu consommes une API externe (ex: produits, utilisateurs…).
- Le schéma peut évoluer : champs ajoutés, renommés, supprimés…
- Tu veux :
- ✅ valider que les champs essentiels sont là (nom, id, date…),
- 🆗 tolérer que certains champs facultatifs soient absents,
- ❌ alerter/loguer quand il y a un vrai souci de contrat cassé.
✅ Stratégie : “strict subset validation with fallback”
- Tu définis un sous-ensemble de colonnes obligatoires (
required_fields
). - Tu autorises les autres champs à être manquants sans que le pipeline échoue.
- Tu logues les cas problématiques (ex: via
dbt_utils
ou des macros personnalisées).
🔨 1. Exemple simple avec dbt : validation dans un staging model
API JSON reçue (par exemple via ingestion Airbyte, Fivetran, ou Databricks) :
{
"id": 123,
"name": "Produit A",
"category": "Livres",
"weight": 1.2,
"new_field": "non_documenté"
}
Mais parfois name
ou category
est absent…
🎯 Objectif : valider que id
, name
, category
existent.
📁 stg_produits.sql
— Staging avec fallback
with raw as (
select *
from {{ source('api', 'produits') }}
),
validated as (
select
id,
-- fallback si `name` manquant
coalesce(name, 'NOM_INCONNU') as name,
-- fallback si `category` manquant
coalesce(category, 'CATÉGORIE_INCONNUE') as category,
-- champ optionnel (on l’ignore s’il n’existe pas)
weight
from raw
where id is not null
)
select * from validated
🔍 2. Macro de validation stricte + fallback
📄 macros/validate_required_columns.sql
{% macro validate_required_columns(model_name, required_fields) %}
{% set cols = adapter.get_columns_in_relation(ref(model_name)) %}
{% set col_names = cols | map(attribute='name') | list %}
{% for field in required_fields %}
{% if field not in col_names %}
{% do exceptions.raise("🚨 Le champ requis '" ~ field ~ "' est manquant dans le modèle " ~ model_name) %}
{% endif %}
{% endfor %}
{% endmacro %}
📄 Utilisation dans un modèle :
{{ validate_required_columns('stg_produits', ['id', 'name', 'category']) }}
📈 3. Logging non-bloquant pour suivi
Tu peux aussi loguer les violations sans casser le run :
{% if 'name' not in col_names %}
{% do log("⚠️ Champ 'name' manquant dans stg_produits", info=True) %}
{% endif %}
✅ Avantages de cette approche
Aspect | Bénéfice |
---|---|
🔐 Sécurité des données | Les champs clés sont garantis d’être présents |
🔄 Évolutivité | Les nouveaux champs ne cassent pas le pipeline |
🧪 Monitoring contractuel | Tu peux détecter les changements inattendus (alertes ou logs) |
🧼 Code lisible | Les fallback sont explicites, tu sais ce que tu tolères |
🛠️ Variante avancée : test automatisé
Avec le package dbt_expectations, tu peux écrire :
- name: stg_produits
tests:
- dbt_expectations.expect_column_to_exist:
column_name: id
- dbt_expectations.expect_column_to_exist:
column_name: name
- dbt_expectations.expect_column_to_exist:
column_name: category
Parfait Walid 👌
Voici un mini-projet dbt complet prêt à copier-coller pour gérer une source API externe avec fallback + validation + tests.
📁 Structure du projet
my_dbt_project/
├── dbt_project.yml
├── models/
│ ├── staging/
│ │ └── stg_produits.sql
├── macros/
│ └── validate_required_columns.sql
├── tests/
│ └── produits_columns_test.yml
└── sources.yml
🔧 1. dbt_project.yml
name: my_dbt_project
version: 1.0
profile: default
vars:
current_tenant: "default"
models:
my_dbt_project:
staging:
+materialized: view
🧾 2. models/staging/stg_produits.sql
{{ validate_required_columns('stg_produits', ['id', 'name', 'category']) }}
with raw as (
select *
from {{ source('api', 'produits') }}
),
validated as (
select
id,
coalesce(name, 'NOM_INCONNU') as name,
coalesce(category, 'CATÉGORIE_INCONNUE') as category,
weight -- optionnel
from raw
where id is not null
)
select * from validated
📚 3. macros/validate_required_columns.sql
{% macro validate_required_columns(model_name, required_fields) %}
{% set cols = adapter.get_columns_in_relation(ref(model_name)) %}
{% set col_names = cols | map(attribute='name') | list %}
{% for field in required_fields %}
{% if field not in col_names %}
{% do exceptions.raise("🚨 Le champ requis '" ~ field ~ "' est manquant dans le modèle " ~ model_name) %}
{% endif %}
{% endfor %}
{% endmacro %}
🔗 4. sources.yml
version: 2
sources:
- name: api
schema: raw_api
tables:
- name: produits
description: "Données brutes provenant de l’API externe"
🧪 5. tests/produits_columns_test.yml
version: 2
models:
- name: stg_produits
tests:
- dbt_expectations.expect_column_to_exist:
column_name: id
- dbt_expectations.expect_column_to_exist:
column_name: name
- dbt_expectations.expect_column_to_exist:
column_name: category
🚀 6. Exécution complète
# Compilation
dbt debug
# Build du modèle
dbt run --select stg_produits
# Test du schéma
dbt test --select stg_produits
📌 Astuces bonus
- Tu peux injecter un fichier JSON API simulé dans
raw_api.produits
avec dbt seed ou un script Python. - Si tu veux gérer plusieurs tenants ou formats d’API, tu peux combiner avec des variables + logique par
var('source_type')
.