Creates, configures, and updates Databricks Lakeflow Spark Declarative Pipelines (SDP/LDP) using serverless compute. Handles data ingestion with streaming tables, materialized views, CDC, SCD Type 2, and Auto Loader ingestion patterns. Use when building data pipelines, working with Delta Live Tables, ingesting streaming data, implementing change data capture, or when the user mentions SDP, LDP, DLT, Lakeflow pipelines, streaming tables, or bronze/silver/gold medallion architectures.
94
92%
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Passed
No known issues
CREATE OR REFRESH for SDP objects:
CREATE OR REFRESH STREAMING TABLE - for streaming tablesCREATE OR REFRESH MATERIALIZED VIEW - for materialized viewsCREATE OR REPLACE - that is standard SQL syntax, not SDP syntaxcatalog.schema.table)| User Says | Action |
|---|---|
| "Python pipeline", "Python SDP", "use Python", "udf", "pandas", "ml inference", "pyspark" | User wants Python |
| "SQL pipeline", "SQL files", "use SQL" | User wants SQL |
| "Create a simple pipeline", "create a table", "an aggregation" | Pick SQL as it's simple |
databricks fs ls dbfs:/Volumes/{catalog}/{schema}/{volume}/{path} --profile {PROFILE}databricks experimental aitools tools query --profile {PROFILE} --warehouse abc123 "SELECT 1 FROM catalog.schema.table"databricks experimental aitools tools discover-schema --profile {PROFILE} catalog.schema.table1 catalog.schema.table2databricks pipelines init|deploy|run|logs|stop or use databricks pipelines --help for more optionsFirst, determine which workflow to use:
databricks pipelines init)Use this when the user wants to create a new, standalone SDP project that will have its own DAB:
databricks.yml in the workspaceUse databricks pipeline CLI commands:
databricks pipelines init --output-dir . --config-file init-config.jsonExample init-config.json:
{
"project_name": "customer_pipeline",
"initial_catalog": "prod_catalog",
"use_personal_schema": "no",
"initial_language": "sql"
}→ See 1-project-initialization.md →
Use this when the pipeline is part of an existing DAB project:
databricks.yml file in the project→ See 1-project-initialization.md for adding pipelines to existing bundles
Use this when you need to quickly create, test, and iterate on a pipeline without managing bundle files:
→ See 2-mcp-approach.md for MCP-based workflow
Before writing pipeline code, make sure you have:
- [ ] Language selected: Python or SQL
- [ ] Read the syntax basics: **SQL**: Always Read [sql/1-syntax-basics.md](references/sql/1-syntax-basics.md), **Python**: Always Read [python/1-syntax-basics.md](references/python/1-syntax-basics.md)
- [ ] Workflow chosen: Standalone DAB / Existing DAB / MCP iteration
- [ ] Compute type: serverless (default) or classic
- [ ] Schema strategy: single schema with prefixes vs. multi-schema
- [ ] Consider [Multi-Schema Patterns](#multi-schema-patterns) and [Modern Defaults](#modern-defaults)Then read additional guides based on what the pipeline needs, when you need it:
| If the pipeline needs... | Read |
|---|---|
| File ingestion (Auto Loader, JSON, CSV, Parquet) | references/sql/2-ingestion.md or references/python/2-ingestion.md |
| Kafka, Event Hub, or Kinesis streaming | references/sql/2-ingestion.md or references/python/2-ingestion.md |
| Deduplication, windowed aggregations, joins | references/sql/3-streaming-patterns.md or references/python/3-streaming-patterns.md |
| CDC, SCD Type 1/2, or history tracking | references/sql/4-cdc-patterns.md or references/python/4-cdc-patterns.md |
| Performance tuning, Liquid Clustering | references/sql/5-performance.md or references/python/5-performance.md |
| Concept | Details |
|---|---|
| Names | SDP = Spark Declarative Pipelines = LDP = Lakeflow Declarative Pipelines (all interchangeable) |
| SQL Syntax | CREATE OR REFRESH STREAMING TABLE, CREATE OR REFRESH MATERIALIZED VIEW |
| Python Import | from pyspark import pipelines as dp |
| Primary Decorators | @dp.table(), @dp.materialized_view(), @dp.temporary_view() |
| Legacy | Modern Replacement |
|---|---|
import dlt | from pyspark import pipelines as dp |
dlt.apply_changes() | dp.create_auto_cdc_flow() |
dlt.read() / dlt.read_stream() | spark.read / spark.readStream |
CREATE LIVE XXX | CREATE OR REFRESH STREAMING TABLE|MATERIALIZED VIEW |
PARTITION BY + ZORDER | CLUSTER BY (Liquid Clustering) |
input_file_name() | _metadata.file_path |
target parameter | schema parameter |
| Use Case | Type | Pattern |
|---|---|---|
| Windowed aggregations (tumbling, sliding, session) | Streaming Table | FROM stream(source) + GROUP BY window() |
| Full-table aggregations (totals, daily counts) | Materialized View | FROM source (no stream wrapper) |
| CDC / SCD Type 2 | Streaming Table | AUTO CDC INTO or dp.create_auto_cdc_flow() |
Use streaming tables for windowed aggregations to enable incremental processing. Use materialized views for simple aggregations that recompute fully on each refresh.
After choosing your workflow (see Choose Your Workflow), determine the specific task:
Choose documentation by language:
| Task | Guide |
|---|---|
| SQL syntax basics | sql/1-syntax-basics.md |
| Data ingestion (Auto Loader, Kafka) | sql/2-ingestion.md |
| Streaming patterns (deduplication, windows) | sql/3-streaming-patterns.md |
| CDC patterns (AUTO CDC, SCD, queries) | sql/4-cdc-patterns.md |
| Performance tuning | sql/5-performance.md |
| Task | Guide |
|---|---|
| Python syntax basics | python/1-syntax-basics.md |
| Data ingestion (Auto Loader, Kafka) | python/2-ingestion.md |
| Streaming patterns (deduplication, windows) | python/3-streaming-patterns.md |
| CDC patterns (AUTO CDC, SCD, queries) | python/4-cdc-patterns.md |
| Performance tuning | python/5-performance.md |
| Task | Guide |
|---|---|
| Setting up standalone pipeline project | 1-project-initialization.md |
| Rapid iteration with MCP tools | 2-mcp-approach.md |
| Advanced configuration | 3-advanced-configuration.md |
| Migrating from DLT | 4-dlt-migration.md |
pyspark.pipelines API| Layer | SDP Pattern | Common Practices |
|---|---|---|
| Bronze | STREAM read_files() → streaming table | Often adds _metadata.file_path, _ingested_at. Minimal transforms, append-only. |
| Silver | stream(bronze) → streaming table | Clean/validate, type casting, quality filters. Prefer DECIMAL(p,s) for money. Dedup can happen here or gold. |
| Gold | AUTO CDC INTO or materialized view | Aggregated, denormalized. SCD/dedup often via AUTO CDC. Star schema typically uses dim_*/fact_*. |
For medallion architecture (bronze/silver/gold), two approaches work:
bronze_*.sql, silver_*.sql, gold_*.sqlbronze/orders.sql, silver/cleaned.sql, gold/summary.sqlBoth work with the transformations/** glob pattern. Choose based on preference/existing.
See 1-project-initialization.md for complete details on bundle initialization, migration, and troubleshooting.
SQL Example:
CREATE OR REFRESH STREAMING TABLE bronze_orders
CLUSTER BY (order_date)
AS SELECT *, current_timestamp() AS _ingested_at
FROM STREAM read_files('/Volumes/catalog/schema/raw/orders/', format => 'json');Python Example:
from pyspark import pipelines as dp
@dp.table(name="bronze_events", cluster_by=["event_date"])
def bronze_events():
return spark.readStream.format("cloudFiles").option("cloudFiles.format", "json").load("/Volumes/...")For detailed syntax, see sql/1-syntax-basics.md or python/1-syntax-basics.md.
databricks pipelines init for Asset Bundle with multi-environment supportresources/*.pipeline.yml.sql/.py files for the transformations files - NO notebooks in your pipeline. Pipeline code must be plain files.# Databricks notebook source format with # COMMAND ---------- separators for ad-hoc queries. See examples/exploration_notebook.py.Preferred: One pipeline writing to multiple schemas using fully qualified table names (catalog.schema.table). This keeps dependencies clear and is simpler to manage than multiple pipelines.
@dp.table(name="catalog.bronze_schema.orders")CREATE OR REFRESH STREAMING TABLE catalog.silver_schema.orders_clean AS ...For detailed examples, see 3-advanced-configuration.md.
Fallback: If all tables must be in the same schema, use name prefixes (bronze_*, silver_*, gold_*).
After running a pipeline (via DAB or MCP), you MUST validate both the execution status AND the actual data.
From MCP (run_pipeline or create_or_update_pipeline):
result["success"] and result["state"]result["message"] and result["errors"] for detailsFrom DAB (databricks bundle run):
get_pipeline(pipeline_id=...) to get detailed status and recent eventsEven if the pipeline reports SUCCESS, you MUST verify the data is correct:
# MCP Tool: get_table_stats_and_schema - validates schema, row counts, and stats
get_table_stats_and_schema(
catalog="my_catalog",
schema="my_schema",
table_names=["bronze_*", "silver_*", "gold_*"] # Use glob patterns
)Check for:
If validation reveals problems, trace upstream to find the root cause:
Start from the problematic table - identify what's wrong (empty, wrong counts, bad data)
Check its source table - use get_table_stats_and_schema on the upstream table
Trace back to bronze - continue until you find where the issue originates
Common causes:
Fix the SQL/Python code, re-upload, and re-run the pipeline
Do NOT use execute_sql with COUNT queries for validation - get_table_stats_and_schema is faster and returns more information in a single call.
| Issue | Solution |
|---|---|
| Empty output tables | Use get_table_stats_and_schema to check upstream sources. Verify source files exist and paths are correct. |
| Pipeline stuck INITIALIZING | Normal for serverless, wait a few minutes |
| "Column not found" | Check schemaHints match actual data |
| Streaming reads fail | For file ingestion in a streaming table, you must use the STREAM keyword with read_files: FROM STREAM read_files(...). For table streams use FROM stream(table). See read_files — Usage in streaming tables. |
| Timeout during run | Increase timeout, or use wait_for_completion=False and check status with get_pipeline |
| MV doesn't refresh | Enable row tracking on source tables |
| SCD2: query column not found | Lakeflow uses __START_AT and __END_AT (double underscore), not START_AT/END_AT. Use WHERE __END_AT IS NULL for current rows. See sql/4-cdc-patterns.md. |
| AUTO CDC parse error at APPLY/SEQUENCE | Put APPLY AS DELETE WHEN before SEQUENCE BY. Only list columns in COLUMNS * EXCEPT (...) that exist in the source (omit _rescued_data unless bronze uses rescue data). Omit TRACK HISTORY ON * if it causes "end of input" errors; default is equivalent. See sql/4-cdc-patterns.md. |
| "Cannot create streaming table from batch query" | In a streaming table query, use FROM STREAM read_files(...) so read_files leverages Auto Loader; FROM read_files(...) alone is batch. See sql/2-ingestion.md and read_files — Usage in streaming tables. |
For detailed errors, the result["message"] from create_or_update_pipeline includes suggested next steps. Use get_pipeline(pipeline_id=...) which includes recent events and error details.
For advanced configuration options (development mode, continuous pipelines, custom clusters, notifications, Python dependencies, etc.), see 3-advanced-configuration.md.
| Requirement | Details |
|---|---|
| Unity Catalog | Required - serverless pipelines always use UC |
| Workspace Region | Must be in serverless-enabled region |
| Serverless Terms | Must accept serverless terms of use |
| CDC Features | Requires serverless (or Pro/Advanced with classic clusters) |
| Limitation | Workaround |
|---|---|
| R language | Not supported - use classic clusters if required |
| Spark RDD APIs | Not supported - use classic clusters if required |
| JAR libraries | Not supported - use classic clusters if required |
| Maven coordinates | Not supported - use classic clusters if required |
| DBFS root access | Limited - must use Unity Catalog external locations |
| Global temp views | Not supported |
| Constraint | Details |
|---|---|
| Schema Evolution | Streaming tables require full refresh for incompatible changes |
| SQL Limitations | PIVOT clause unsupported |
| Sinks | Python only, streaming only, append flows only |
Default to serverless unless user explicitly requires R, RDD APIs, or JAR libraries.
b4071a0
If you maintain this skill, you can claim it as your own. Once claimed, you can manage eval scenarios, bundle related skills, attach documentation or rules, and ensure cross-agent compatibility.