dbt: Handling Evolving Source Schemas in dbt with Dynamic Column Selection


When working with external data sources (like APIs, third-party tables, or partner feeds), the schema often changes over time:

  • New columns may appear,
  • Old ones may disappear,
  • Or data types may change.

Hardcoding column names in your SELECT statements can easily break your models or cause silent issues.
βœ… A better approach: dynamic column selection + schema validation.


🧠 What does this mean?

  • Dynamic column selection: Only select the columns that actually exist and are needed.
  • Schema validation: Check that all required columns are present and have the expected types.

This pattern makes your dbt models more robust, flexible, and self-healing.


πŸ“¦ Example Use Case

You’re reading from a raw source table: raw_api.customer_data

The upstream schema can change without notice, but your model only depends on:

customer_id, email, created_at

βœ… Step 1: Define required columns and types in a macro

In macros/utils/validate_schema.sql:

{% macro get_required_columns() %}
    {
      "customer_id": "string",
      "email": "string",
      "created_at": "timestamp"
    }
{% endmacro %}

βœ… Step 2: Write a macro to validate and select only available columns

{% macro safe_select(source_relation) %}
    {% set required_columns = get_required_columns() %}
    {% set actual_columns = adapter.get_columns_in_relation(source_relation) %}

    {% set actual_column_names = actual_columns | map(attribute='name') | list %}
    {% set missing_columns = [] %}
    {% set selected_columns = [] %}

    {% for column, expected_type in required_columns.items() %}
        {% if column in actual_column_names %}
            {% set actual_type = (
              actual_columns | selectattr('name', 'equalto', column)
                             | map(attribute='data_type') | list
            )[0] %}
            {% if expected_type not in actual_type %}
                {{ exceptions.raise_compiler_error("Column '" ~ column ~ "' exists but has unexpected type: expected '" ~ expected_type ~ "', got '" ~ actual_type ~ "'.") }}
            {% endif %}
            {% do selected_columns.append(column) %}
        {% else %}
            {% do missing_columns.append(column) %}
        {% endif %}
    {% endfor %}

    {% if missing_columns | length > 0 %}
        {{ exceptions.raise_compiler_error("Missing required columns: " ~ missing_columns | join(', ')) }}
    {% endif %}

    {{ return(selected_columns | join(', ')) }}
{% endmacro %}

βœ… Step 3: Use it in a dbt model

-- models/staging/stg_customer_data.sql

select
  {{ safe_select(source('raw_api', 'customer_data')) }}
from {{ source('raw_api', 'customer_data') }}

🧼 What happens here?

  • The macro reads the actual source table schema (get_columns_in_relation).
  • It validates:
    • Are all required columns present?
    • Do they match the expected types?
  • It dynamically generates the SELECT clause with only those valid columns.
  • If something is wrong, it fails early with a clear message during dbt run.

βœ… Benefits

FeatureDescription
πŸ”Ž Dynamic selectionSelects only valid, needed columns at runtime.
πŸ›‘οΈ Schema validationFails fast when columns are missing or types change.
πŸ”„ Future-proofAllows new columns to be added without breaking your models.
πŸ“‹ Clean, maintainable modelsAvoids hardcoding column names repeatedly.
πŸ“£ Clear error messagesEasier debugging during schema drift.

πŸ§ͺ Bonus: Test schema drift in CI

Use this macro in a CI job to catch issues before deployment by simulating schema changes or running on a staging source.


Leave a Reply

Your email address will not be published. Required fields are marked *