dbt: how to implement query progress callbacks in a dbt adapter


🎯 Goal

When executing long-running SQL queries, especially on large warehouses (e.g. Databricks, Trino, Presto, Snowflake), you want to:

  • Receive real-time updates (e.g. % complete, rows scanned, resource usage)
  • Optionally: log, notify, or even cancel queries based on thresholds
  • Integrate this into dbt’s execution flow via a custom adapter

🧱 Concept: Progress Callback

In Python, a callback is a function you pass into another function to be called at certain points.
In dbt adapters, you can define a callback function that listens to query progress metrics.


📦 Example: Implementing a Progress Callback in a Custom dbt Adapter

Imagine you’re building a dbt adapter for dbt-example and you want to log query progress every few seconds.

✅ Step 1: Define the progress callback function

In your adapter (e.g., plugins/example/dbt/adapters/example/connections.py):

def default_progress_callback(query_id, progress_info):
    """
    Default progress callback to log query progress.
    """
    logger = logging.getLogger('dbt')
    logger.info(f"[{query_id}] Progress: {progress_info.get('percent_complete')}% - "
                f"Rows scanned: {progress_info.get('rows_scanned')} - "
                f"Stage: {progress_info.get('current_stage')}")

✅ Step 2: Call the callback inside your execute method

This is the core of a dbt adapter — typically implemented in the ConnectionWrapper.

def execute(self, sql, auto_begin=True, fetch=False, progress_callback=None):
    query_id = self._start_query(sql)

    # Fallback if no callback was passed
    if progress_callback is None:
        progress_callback = default_progress_callback

    try:
        while not self._query_complete(query_id):
            progress_info = self._fetch_query_progress(query_id)
            progress_callback(query_id, progress_info)
            time.sleep(5)  # Poll every 5 seconds

        result = self._fetch_query_results(query_id)
        return result

    except Exception as e:
        self._cancel_query(query_id)
        raise dbt.exceptions.RuntimeException(f"Query failed: {str(e)}")

✅ Step 3: Simulate a progress polling method

This simulates querying the warehouse API for progress:

def _fetch_query_progress(self, query_id):
    # Simulate querying warehouse API
    return {
        "percent_complete": 65,
        "rows_scanned": 4500000,
        "current_stage": "shuffle_write"
    }

In a real adapter, this could query:

  • SHOW QUERIES on Snowflake
  • REST API on Databricks
  • /v1/query on Trino

✅ Step 4: Use it in dbt during execution

Now when you run:

dbt run --select long_running_model

Your logs might show:

12:30:05  [q12345] Progress: 10% - Rows scanned: 500K - Stage: parsing
12:30:10  [q12345] Progress: 40% - Rows scanned: 3M - Stage: filtering
12:30:15  [q12345] Progress: 85% - Rows scanned: 10M - Stage: writing output

✅ Benefits of This Pattern

FeatureBenefit
⏱ Real-time feedbackSee how far a query has progressed
📊 Metrics trackingRows scanned, stages, bytes shuffled, etc.
⚠️ Early warningsDetect expensive or stuck queries early
🔌 Plug-and-playWorks with any dbt adapter that supports it
🚫 Optional cancellationCancel slow/stuck queries in CI/CD or UI

🧪 Optional Extensions

  • 💬 Send progress to Slack / webhook
  • ⏰ Estimate time remaining (ETA = elapsed_time / %)
  • 💥 Add thresholds: auto-cancel if query scans > 100GB

🧼 Summary

  • A progress callback is a function that gets real-time query execution metrics.
  • You inject it into the dbt adapter’s execute function.
  • This makes it easy to monitor, log, or act on long-running queries.
  • Works best with data warehouses that expose rich query metadata (Snowflake, Databricks, Trino, etc.)

Leave a Reply

Your email address will not be published. Required fields are marked *