When working with external data sources (like APIs, third-party tables, or partner feeds), the schema often changes over time:
- New columns may appear,
- Old ones may disappear,
- Or data types may change.
Hardcoding column names in your SELECT
statements can easily break your models or cause silent issues.
β
A better approach: dynamic column selection + schema validation.
π§ What does this mean?
- Dynamic column selection: Only select the columns that actually exist and are needed.
- Schema validation: Check that all required columns are present and have the expected types.
This pattern makes your dbt models more robust, flexible, and self-healing.
π¦ Example Use Case
Youβre reading from a raw source table: raw_api.customer_data
The upstream schema can change without notice, but your model only depends on:
customer_id, email, created_at
β Step 1: Define required columns and types in a macro
In macros/utils/validate_schema.sql
:
{% macro get_required_columns() %}
{
"customer_id": "string",
"email": "string",
"created_at": "timestamp"
}
{% endmacro %}
β Step 2: Write a macro to validate and select only available columns
{% macro safe_select(source_relation) %}
{% set required_columns = get_required_columns() %}
{% set actual_columns = adapter.get_columns_in_relation(source_relation) %}
{% set actual_column_names = actual_columns | map(attribute='name') | list %}
{% set missing_columns = [] %}
{% set selected_columns = [] %}
{% for column, expected_type in required_columns.items() %}
{% if column in actual_column_names %}
{% set actual_type = (
actual_columns | selectattr('name', 'equalto', column)
| map(attribute='data_type') | list
)[0] %}
{% if expected_type not in actual_type %}
{{ exceptions.raise_compiler_error("Column '" ~ column ~ "' exists but has unexpected type: expected '" ~ expected_type ~ "', got '" ~ actual_type ~ "'.") }}
{% endif %}
{% do selected_columns.append(column) %}
{% else %}
{% do missing_columns.append(column) %}
{% endif %}
{% endfor %}
{% if missing_columns | length > 0 %}
{{ exceptions.raise_compiler_error("Missing required columns: " ~ missing_columns | join(', ')) }}
{% endif %}
{{ return(selected_columns | join(', ')) }}
{% endmacro %}
β Step 3: Use it in a dbt model
-- models/staging/stg_customer_data.sql
select
{{ safe_select(source('raw_api', 'customer_data')) }}
from {{ source('raw_api', 'customer_data') }}
π§Ό What happens here?
- The macro reads the actual source table schema (
get_columns_in_relation
). - It validates:
- Are all required columns present?
- Do they match the expected types?
- It dynamically generates the
SELECT
clause with only those valid columns. - If something is wrong, it fails early with a clear message during
dbt run
.
β Benefits
Feature | Description |
---|---|
π Dynamic selection | Selects only valid, needed columns at runtime. |
π‘οΈ Schema validation | Fails fast when columns are missing or types change. |
π Future-proof | Allows new columns to be added without breaking your models. |
π Clean, maintainable models | Avoids hardcoding column names repeatedly. |
π£ Clear error messages | Easier debugging during schema drift. |
π§ͺ Bonus: Test schema drift in CI
Use this macro in a CI job to catch issues before deployment by simulating schema changes or running on a staging source.