🎯 Goal
When executing long-running SQL queries, especially on large warehouses (e.g. Databricks, Trino, Presto, Snowflake), you want to:
- Receive real-time updates (e.g. % complete, rows scanned, resource usage)
- Optionally: log, notify, or even cancel queries based on thresholds
- Integrate this into dbt’s execution flow via a custom adapter
🧱 Concept: Progress Callback
In Python, a callback is a function you pass into another function to be called at certain points.
In dbt adapters, you can define a callback function that listens to query progress metrics.
📦 Example: Implementing a Progress Callback in a Custom dbt Adapter
Imagine you’re building a dbt adapter for dbt-example
and you want to log query progress every few seconds.
✅ Step 1: Define the progress callback function
In your adapter (e.g., plugins/example/dbt/adapters/example/connections.py
):
def default_progress_callback(query_id, progress_info):
"""
Default progress callback to log query progress.
"""
logger = logging.getLogger('dbt')
logger.info(f"[{query_id}] Progress: {progress_info.get('percent_complete')}% - "
f"Rows scanned: {progress_info.get('rows_scanned')} - "
f"Stage: {progress_info.get('current_stage')}")
✅ Step 2: Call the callback inside your execute
method
This is the core of a dbt adapter — typically implemented in the ConnectionWrapper
.
def execute(self, sql, auto_begin=True, fetch=False, progress_callback=None):
query_id = self._start_query(sql)
# Fallback if no callback was passed
if progress_callback is None:
progress_callback = default_progress_callback
try:
while not self._query_complete(query_id):
progress_info = self._fetch_query_progress(query_id)
progress_callback(query_id, progress_info)
time.sleep(5) # Poll every 5 seconds
result = self._fetch_query_results(query_id)
return result
except Exception as e:
self._cancel_query(query_id)
raise dbt.exceptions.RuntimeException(f"Query failed: {str(e)}")
✅ Step 3: Simulate a progress polling method
This simulates querying the warehouse API for progress:
def _fetch_query_progress(self, query_id):
# Simulate querying warehouse API
return {
"percent_complete": 65,
"rows_scanned": 4500000,
"current_stage": "shuffle_write"
}
In a real adapter, this could query:
SHOW QUERIES
on Snowflake- REST API on Databricks
/v1/query
on Trino
✅ Step 4: Use it in dbt during execution
Now when you run:
dbt run --select long_running_model
Your logs might show:
12:30:05 [q12345] Progress: 10% - Rows scanned: 500K - Stage: parsing
12:30:10 [q12345] Progress: 40% - Rows scanned: 3M - Stage: filtering
12:30:15 [q12345] Progress: 85% - Rows scanned: 10M - Stage: writing output
✅ Benefits of This Pattern
Feature | Benefit |
---|---|
⏱ Real-time feedback | See how far a query has progressed |
📊 Metrics tracking | Rows scanned, stages, bytes shuffled, etc. |
⚠️ Early warnings | Detect expensive or stuck queries early |
🔌 Plug-and-play | Works with any dbt adapter that supports it |
🚫 Optional cancellation | Cancel slow/stuck queries in CI/CD or UI |
🧪 Optional Extensions
- 💬 Send progress to Slack / webhook
- ⏰ Estimate time remaining (
ETA = elapsed_time / %
) - 💥 Add thresholds: auto-cancel if query scans > 100GB
🧼 Summary
- A progress callback is a function that gets real-time query execution metrics.
- You inject it into the dbt adapter’s
execute
function. - This makes it easy to monitor, log, or act on long-running queries.
- Works best with data warehouses that expose rich query metadata (Snowflake, Databricks, Trino, etc.)