Validation de schéma avec champs obligatoires stricts et gestion de secours

En anglais : “Schema validation with strict required fields and fallback handling”


🧠 Problème à résoudre

  • Tu consommes une API externe (ex: produits, utilisateurs…).
  • Le schéma peut évoluer : champs ajoutés, renommés, supprimés…
  • Tu veux :
    • valider que les champs essentiels sont là (nom, id, date…),
    • 🆗 tolérer que certains champs facultatifs soient absents,
    • alerter/loguer quand il y a un vrai souci de contrat cassé.

✅ Stratégie : “strict subset validation with fallback”

  1. Tu définis un sous-ensemble de colonnes obligatoires (required_fields).
  2. Tu autorises les autres champs à être manquants sans que le pipeline échoue.
  3. Tu logues les cas problématiques (ex: via dbt_utils ou des macros personnalisées).

🔨 1. Exemple simple avec dbt : validation dans un staging model

API JSON reçue (par exemple via ingestion Airbyte, Fivetran, ou Databricks) :

{
  "id": 123,
  "name": "Produit A",
  "category": "Livres",
  "weight": 1.2,
  "new_field": "non_documenté"
}

Mais parfois name ou category est absent…


🎯 Objectif : valider que id, name, category existent.


📁 stg_produits.sql — Staging avec fallback

with raw as (
    select *
    from {{ source('api', 'produits') }}
),

validated as (

    select
        id,
        -- fallback si `name` manquant
        coalesce(name, 'NOM_INCONNU') as name,

        -- fallback si `category` manquant
        coalesce(category, 'CATÉGORIE_INCONNUE') as category,

        -- champ optionnel (on l’ignore s’il n’existe pas)
        weight

    from raw
    where id is not null
)

select * from validated

🔍 2. Macro de validation stricte + fallback

📄 macros/validate_required_columns.sql

{% macro validate_required_columns(model_name, required_fields) %}
    {% set cols = adapter.get_columns_in_relation(ref(model_name)) %}
    {% set col_names = cols | map(attribute='name') | list %}

    {% for field in required_fields %}
        {% if field not in col_names %}
            {% do exceptions.raise("🚨 Le champ requis '" ~ field ~ "' est manquant dans le modèle " ~ model_name) %}
        {% endif %}
    {% endfor %}
{% endmacro %}

📄 Utilisation dans un modèle :

{{ validate_required_columns('stg_produits', ['id', 'name', 'category']) }}

📈 3. Logging non-bloquant pour suivi

Tu peux aussi loguer les violations sans casser le run :

{% if 'name' not in col_names %}
    {% do log("⚠️ Champ 'name' manquant dans stg_produits", info=True) %}
{% endif %}

✅ Avantages de cette approche

AspectBénéfice
🔐 Sécurité des donnéesLes champs clés sont garantis d’être présents
🔄 ÉvolutivitéLes nouveaux champs ne cassent pas le pipeline
🧪 Monitoring contractuelTu peux détecter les changements inattendus (alertes ou logs)
🧼 Code lisibleLes fallback sont explicites, tu sais ce que tu tolères

🛠️ Variante avancée : test automatisé

Avec le package dbt_expectations, tu peux écrire :

- name: stg_produits
  tests:
    - dbt_expectations.expect_column_to_exist:
        column_name: id
    - dbt_expectations.expect_column_to_exist:
        column_name: name
    - dbt_expectations.expect_column_to_exist:
        column_name: category

Parfait Walid 👌
Voici un mini-projet dbt complet prêt à copier-coller pour gérer une source API externe avec fallback + validation + tests.


📁 Structure du projet

my_dbt_project/
├── dbt_project.yml
├── models/
│   ├── staging/
│   │   └── stg_produits.sql
├── macros/
│   └── validate_required_columns.sql
├── tests/
│   └── produits_columns_test.yml
└── sources.yml

🔧 1. dbt_project.yml

name: my_dbt_project
version: 1.0
profile: default

vars:
  current_tenant: "default"

models:
  my_dbt_project:
    staging:
      +materialized: view

🧾 2. models/staging/stg_produits.sql

{{ validate_required_columns('stg_produits', ['id', 'name', 'category']) }}

with raw as (
    select *
    from {{ source('api', 'produits') }}
),

validated as (

    select
        id,
        coalesce(name, 'NOM_INCONNU') as name,
        coalesce(category, 'CATÉGORIE_INCONNUE') as category,
        weight  -- optionnel
    from raw
    where id is not null

)

select * from validated

📚 3. macros/validate_required_columns.sql

{% macro validate_required_columns(model_name, required_fields) %}
    {% set cols = adapter.get_columns_in_relation(ref(model_name)) %}
    {% set col_names = cols | map(attribute='name') | list %}

    {% for field in required_fields %}
        {% if field not in col_names %}
            {% do exceptions.raise("🚨 Le champ requis '" ~ field ~ "' est manquant dans le modèle " ~ model_name) %}
        {% endif %}
    {% endfor %}
{% endmacro %}

🔗 4. sources.yml

version: 2

sources:
  - name: api
    schema: raw_api
    tables:
      - name: produits
        description: "Données brutes provenant de l’API externe"

🧪 5. tests/produits_columns_test.yml

version: 2

models:
  - name: stg_produits
    tests:
      - dbt_expectations.expect_column_to_exist:
          column_name: id
      - dbt_expectations.expect_column_to_exist:
          column_name: name
      - dbt_expectations.expect_column_to_exist:
          column_name: category

🚀 6. Exécution complète

# Compilation
dbt debug

# Build du modèle
dbt run --select stg_produits

# Test du schéma
dbt test --select stg_produits

📌 Astuces bonus

  • Tu peux injecter un fichier JSON API simulé dans raw_api.produits avec dbt seed ou un script Python.
  • Si tu veux gérer plusieurs tenants ou formats d’API, tu peux combiner avec des variables + logique par var('source_type').

Leave a Reply

Your email address will not be published. Required fields are marked *