Develop Lakeflow Spark Declarative Pipelines (formerly Delta Live Tables) on Databricks. Use when building batch or streaming data pipelines with Python or SQL. Invoke BEFORE starting implementation.
71
87%
Does it follow best practices?
Impact
—
No eval scenarios have been run
Passed
No known issues
FIRST: Use the parent databricks-core skill for CLI basics, authentication, profile selection, and data discovery commands.
Use this tree to determine which dataset type and features to use. Multiple features can apply to the same dataset — e.g., a Streaming Table can use Auto Loader for ingestion, Append Flows for fan-in, and Expectations for data quality. Choose the dataset type first, then layer on applicable features.
User request → What kind of output?
├── Intermediate/reusable logic (not persisted) → Temporary View
│ ├── Preprocessing/filtering before Auto CDC → Temporary View feeding CDC flow
│ ├── Shared intermediate streaming logic reused by multiple downstream tables
│ ├── Pipeline-private helper logic (not published to catalog)
│ └── Published to UC for external queries → Persistent View (SQL only)
├── Persisted dataset
│ ├── Source is streaming/incremental/continuously growing → Streaming Table
│ │ ├── File ingestion (cloud storage, Volumes) → Auto Loader
│ │ ├── Message bus (Kafka, Kinesis, Pub/Sub, Pulsar, Event Hubs) → streaming source read
│ │ ├── Existing streaming/Delta table → streaming read from table
│ │ ├── CDC / upserts / track changes / keep latest per key / SCD Type 1 or 2 → Auto CDC
│ │ ├── Multiple sources into one table → Append Flows (NOT union)
│ │ ├── Historical backfill + live stream → one-time Append Flow + regular flow
│ │ └── Windowed aggregation with watermark → stateful streaming
│ └── Source is batch/historical/full scan → Materialized View
│ ├── Aggregation/join across full dataset (GROUP BY, SUM, COUNT, etc.)
│ ├── Gold layer aggregation from streaming table → MV with batch read (spark.read / no STREAM)
│ ├── JDBC/Federation/external batch sources
│ └── Small static file load (reference data, no streaming read)
├── Output to external system (Python only) → Sink
│ ├── Existing external table not managed by this pipeline → Sink with format="delta"
│ │ (prefer fully-qualified dataset names if the pipeline should own the table — see Publishing Modes)
│ ├── Kafka / Event Hubs → Sink with format="kafka" + @dp.append_flow(target="sink_name")
│ ├── Custom destination not natively supported → Sink with custom format
│ ├── Custom merge/upsert logic per batch → ForEachBatch Sink (Public Preview)
│ └── Multiple destinations per batch → ForEachBatch Sink (Public Preview)
└── Data quality constraints → Expectations (on any dataset type)spark.read.table / SELECT FROM without STREAM). STs are append-only and don't recompute aggregates when source rows change; MVs do.private=True / CREATE PRIVATE ...) only when materializing once saves significant reprocessing. For preprocessing before Auto CDC, the temp view is required — the CDC flow reads from STREAM(view_name) (SQL) or spark.readStream.table("view_name") (Python).CREATE OR REFRESH vs CREATE → both parse for SQL datasets, but CREATE OR REFRESH is the idiomatic convention. For PRIVATE datasets: CREATE OR REFRESH PRIVATE STREAMING TABLE / ... MATERIALIZED VIEW.value column is mandatory; serialize the row with to_json(struct(*)) AS value. See sink-python.md.SEQUENCE BY STRUCT(col1, col2). Python: sequence_by=struct("col1", "col2"). See the auto-cdc references.APPLY AS TRUNCATE WHEN condition. Python: apply_as_truncates=expr("condition"). Do NOT claim truncate is unsupported.Error → cause/fix mappings agents hit constantly. For DAB-bundle vs CLI-iteration deploy issues, see the workflow-specific reference files.
| Error / symptom | Cause / fix |
|---|---|
Rejection of CREATE OR REPLACE STREAMING TABLE / MATERIALIZED VIEW | CREATE OR REPLACE is standard SQL, NOT SDP. Use CREATE OR REFRESH STREAMING TABLE / CREATE OR REFRESH MATERIALIZED VIEW. |
CLI errors on databricks fs ls /Volumes/... | The dbfs: prefix is required even for UC Volume paths: databricks fs ls dbfs:/Volumes/<catalog>/<schema>/<volume>/<path>. |
DELTA_CLUSTERING_COLUMNS_DATATYPE_NOT_SUPPORTED at first write | A CLUSTER BY column is BOOLEAN / ARRAY / MAP / STRUCT / BINARY. SDP doesn't pre-validate — verify with DESCRIBE before submitting. Cluster keys must be numeric / string / date / timestamp. Full type rules in references/performance.md. |
Cannot create streaming table from batch query | In a streaming-table query you wrote FROM read_files(...) (batch). Use FROM STREAM read_files(...) so Auto Loader kicks in. |
Column not found at ingest time | schemaHints don't match the actual file schema. DESCRIBE a sample file and align the hints. |
| Streaming reads fail with parser error | Use FROM STREAM read_files(...) for file ingestion and FROM stream(table) (or FROM STREAM table_name — legacy DLT, prefer function form) for table-to-table streams. Don't mix. |
Pipeline stuck INITIALIZING for serverless | Normal — first run takes a few minutes for cold start. Don't kill it. |
| Materialized View doesn't incrementally refresh | Automatic incremental refresh for aggregations requires serverless + Delta row tracking on the source (delta.enableRowTracking = true). Without both, falls back to full recompute. Mention the serverless requirement when the user asks about incremental refresh. |
SCD2 query returns nothing / "column not found" on START_AT | Lakeflow uses __START_AT / __END_AT (double underscore). Current rows: WHERE __END_AT IS NULL. |
error.exceptions[0].message missing from your events output | Your jq is reading .message (which is just "Update X is FAILED"). Read error.exceptions[0].message for the real cause — see 2-rapid-iteration-with-cli.md. |
Pipelines use a default catalog and schema configured in the pipeline settings. All datasets are published there unless overridden.
catalog.schema.table in the dataset name to write to a different catalog/schema than the pipeline default. The pipeline creates the dataset there directly — no Sink needed.Before writing pipeline code for any feature, read the linked reference file. Each table below maps the feature to the exact API and to the detail file for that (feature, language).
Some features sit on top of others — read both:
| Feature | Description | Python | SQL | Skill (Py) | Skill (SQL) |
|---|---|---|---|---|---|
| Streaming Table | Continuous incremental processing, exactly-once, append-only. | @dp.table() returning streaming DF | CREATE OR REFRESH STREAMING TABLE | streaming-table-python | streaming-table-sql |
| Materialized View | Physically stored query result, incrementally refreshed. | @dp.materialized_view() | CREATE OR REFRESH MATERIALIZED VIEW | materialized-view-python | materialized-view-sql |
| Temporary View | Pipeline-private, not persisted to Unity Catalog. | @dp.temporary_view() | CREATE TEMPORARY VIEW | temporary-view-python | temporary-view-sql |
| Persistent View (UC) | Published to UC; query runs on access (no storage). | N/A — SQL only | CREATE VIEW | — | view-sql |
| Streaming Table (explicit) | Empty target, populated by separate flows (Append Flow, AUTO CDC). | dp.create_streaming_table() | CREATE OR REFRESH STREAMING TABLE (no AS) | streaming-table-python | streaming-table-sql |
| Feature | Description | Python | SQL | Skill (Py) | Skill (SQL) |
|---|---|---|---|---|---|
| Append Flow | Fan-in: multiple sources → one streaming table. Use instead of UNION. | @dp.append_flow() | CREATE FLOW ... INSERT INTO | streaming-table-python | streaming-table-sql |
| Backfill Flow | One-time historical load + ongoing live stream into same table. | @dp.append_flow(once=True) | CREATE FLOW ... INSERT INTO ... ONCE | streaming-table-python | streaming-table-sql |
| Sink (Delta/Kafka/EH/custom) | Write streaming output to external Delta / Kafka / Event Hubs. | dp.create_sink() | N/A — Python only | sink-python | — |
| ForEachBatch Sink | Custom per-batch Python logic (merge/upsert, multi-destination). Public Preview. | @dp.foreach_batch_sink() | N/A — Python only | foreach-batch-sink-python | — |
| Feature | Description | Python | SQL | Skill (Py) | Skill (SQL) |
|---|---|---|---|---|---|
| Auto CDC (streaming source) | SCD Type 1 (overwrite) or Type 2 (history) from a CDC feed. | dp.create_auto_cdc_flow() | AUTO CDC INTO ... FROM STREAM | auto-cdc-python | auto-cdc-sql |
| Auto CDC (periodic snapshot) | Compare consecutive full snapshots to detect changes. | dp.create_auto_cdc_from_snapshot_flow() | N/A — Python only | auto-cdc-python | — |
For querying SCD Type 2 history tables (__START_AT / __END_AT, point-in-time, joining facts with historical dimensions), see scd-2-querying.md.
| Feature | Description | Python | SQL | Skill (Py) | Skill (SQL) |
|---|---|---|---|---|---|
| Expect (warn) | Log violations, keep all rows. | @dp.expect() | CONSTRAINT ... EXPECT (...) | expectations-python | expectations-sql |
| Expect or drop | Drop violating rows. | @dp.expect_or_drop() | CONSTRAINT ... EXPECT (...) ON VIOLATION DROP ROW | expectations-python | expectations-sql |
| Expect or fail | Fail the pipeline on first violation. | @dp.expect_or_fail() | CONSTRAINT ... EXPECT (...) ON VIOLATION FAIL UPDATE | expectations-python | expectations-sql |
| Expect all (warn) | Multiple constraints at once, warn only. | @dp.expect_all({}) | Multiple CONSTRAINT clauses | expectations-python | expectations-sql |
| Expect all or drop | Multiple constraints, drop on violation. | @dp.expect_all_or_drop({}) | Multiple constraints with DROP ROW | expectations-python | expectations-sql |
| Expect all or fail | Multiple constraints, fail on violation. | @dp.expect_all_or_fail({}) | Multiple constraints with FAIL UPDATE | expectations-python | expectations-sql |
| Feature | Description | Python | SQL | Skill (Py) | Skill (SQL) |
|---|---|---|---|---|---|
| Batch read (pipeline dataset) | Read a sibling table as a static DataFrame. | spark.read.table("name") | SELECT ... FROM name | — | — |
| Streaming read (pipeline dataset) | Read a sibling table as a streaming DataFrame. | spark.readStream.table("name") | SELECT ... FROM STREAM(name) | — | — |
| Auto Loader (cloud files) | Incrementally ingest new files from cloud storage. | spark.readStream.format("cloudFiles") | STREAM read_files(...) | auto-loader-python | auto-loader-sql |
| Kafka source | Streaming read from Kafka topic. | spark.readStream.format("kafka") | STREAM read_kafka(...) | kafka | kafka |
| Kinesis source | Streaming read from AWS Kinesis. | spark.readStream.format("kinesis") | STREAM read_kinesis(...) | — | — |
| Pub/Sub source | Streaming read from GCP Pub/Sub. | spark.readStream.format("pubsub") | STREAM read_pubsub(...) | — | — |
| Pulsar source | Streaming read from Apache Pulsar. | spark.readStream.format("pulsar") | STREAM read_pulsar(...) | — | — |
| Event Hubs source | Streaming read from Azure Event Hubs (Kafka protocol). | spark.readStream.format("kafka") + EH config | STREAM read_kafka(...) + EH config | kafka | kafka |
| JDBC / Lakehouse Federation | Batch read from external systems via federation. | spark.read.format("postgresql") etc. | Direct table ref via federation catalog | — | — |
| Custom data source | User-defined Python data source. | spark.read[Stream].format("custom") | N/A — Python only | — | — |
| Static file read (batch) | One-shot load of files (no incremental tracking). | spark.read.format("json"|"csv"|...).load() | read_files(...) (no STREAM) | — | — |
| Skip upstream change commits | Ignore CDC commits on the upstream table. | .option("skipChangeCommits", "true") | read_stream("name", skipChangeCommits => true) | streaming-table-python | streaming-table-sql |
| Feature | Description | Python | SQL | Skill (Py) | Skill (SQL) |
|---|---|---|---|---|---|
| Liquid clustering | Adaptive multi-column data layout; replaces PARTITION + Z-ORDER. Prefer Auto clustering when possible | cluster_by=[...] | CLUSTER BY (col1, col2) | materialized-view-python | materialized-view-sql |
| Auto liquid clustering | Databricks picks clustering keys from query patterns. | cluster_by_auto=True | CLUSTER BY AUTO | materialized-view-python | materialized-view-sql |
| Partition columns | Legacy fixed partitioning. Prefer Liquid Clustering. | partition_cols=[...] | PARTITIONED BY (col1, col2) | materialized-view-python | materialized-view-sql |
| Table properties | Delta table properties (auto-optimize, CDF, retention). | table_properties={...} | TBLPROPERTIES (...) | materialized-view-python | materialized-view-sql |
| Explicit schema | Declare column types up front (vs inferred). | schema="col1 TYPE, ..." | (col1 TYPE, ...) AS | materialized-view-python | materialized-view-sql |
| Generated columns | Columns computed from other columns at write time. | schema="..., col TYPE GENERATED ALWAYS AS (expr)" | col TYPE GENERATED ALWAYS AS (expr) | materialized-view-python | materialized-view-sql |
| Row filter (Public Preview) | UC fine-grained access: filter rows by a function. | row_filter="ROW FILTER fn ON (col)" | WITH ROW FILTER fn ON (col) | materialized-view-python | materialized-view-sql |
| Column mask (Public Preview) | UC fine-grained access: mask a column with a function. | schema="..., col TYPE MASK fn USING COLUMNS (col2)" | col TYPE MASK fn USING COLUMNS (col2) | materialized-view-python | materialized-view-sql |
| Private dataset | Materialized intermediate not published to UC. | private=True | CREATE PRIVATE ... | materialized-view-python | materialized-view-sql |
The tables above show only the modern API. If you see any of the following in user code, it is the legacy DLT syntax — always migrate to the modern form, do not extend it. Read references/dlt-migration.md before suggesting changes so the conversion is correct (especially around apply_changes → create_auto_cdc_flow semantics and partition_cols → cluster_by).
| If you see… | …it's DLT. Migrate to |
|---|---|
import dlt | from pyspark import pipelines as dp |
@dlt.table(...), @dlt.append_flow(...), @dlt.expect* | Same decorator name on dp.* (e.g. @dp.table, @dp.expect_or_drop). |
@dlt.view(...) (or @dp.view(...) if present in older code) | @dp.temporary_view(...) — the modern API has no view decorator, only temporary_view. |
dlt.read("name") / dlt.read_stream("name") | spark.read.table("name") / spark.readStream.table("name") |
dp.read(...) / dp.read_stream(...) | Also legacy — use spark.read.table(...) / spark.readStream.table(...). |
dlt.apply_changes(...) / dp.apply_changes(...) | dp.create_auto_cdc_flow(...). sequence_by accepts a column name (string) or col(...); stored_as_scd_type is integer 2 for Type 2 or string "1" for Type 1. |
dlt.apply_changes_from_snapshot(...) | dp.create_auto_cdc_from_snapshot_flow(...) |
dlt.create_streaming_table(...) | dp.create_streaming_table(...) |
LIVE.<name> prefix in SQL | Bare name (SELECT FROM name for batch, SELECT FROM STREAM(name) for streaming). LIVE. will error in modern pipelines. |
CREATE LIVE TABLE / CREATE STREAMING LIVE TABLE | CREATE OR REFRESH MATERIALIZED VIEW / CREATE OR REFRESH STREAMING TABLE. |
CREATE TEMPORARY LIVE VIEW (a.k.a. CREATE LIVE VIEW) | CREATE TEMPORARY VIEW. Exception: CREATE TEMPORARY VIEW does NOT support CONSTRAINT clauses for expectations — for the rare case where you need expectations on a temp view, CREATE LIVE VIEW is retained. See temporary-view-sql.md and expectations-sql.md. |
APPLY CHANGES INTO ... FROM STREAM ... (SQL) | AUTO CDC INTO ... FROM STREAM ... |
partition_cols=[...] / PARTITIONED BY (...) + ZORDER | cluster_by=[...] / CLUSTER BY (...) (Liquid Clustering). |
input_file_name() | _metadata.file_path (SQL) / F.col("_metadata.file_path") (Python). |
target=... parameter on create_streaming_table / pipeline config | schema=... |
Decide before scaffolding — the choice picks template files (.py vs .sql) and which reference docs apply. Both can coexist, but pick a primary. When unsure, default to SQL for simplicity.
| User signal | Pick |
|---|---|
| "Python pipeline", UDF, pandas, ML inference, pyspark | Python |
| "SQL pipeline", "SQL files" | SQL |
| "Simple pipeline", "create a table", "an aggregation" | SQL (simpler, use it as default) |
| Complex parameterized logic, custom UDFs, ML | Python |
If ambiguous, ask. Stick with the chosen language unless the user explicitly switches.
Three project shapes exist — pick before scaffolding. Default to A for production-bound work and C for exploration / demo scaffolding.
databricks.yml. Scaffold with databricks pipelines init --output-dir . --config-file init-config.json. → 1-project-initialization-with-dab.mddatabricks.yml already exists. Add a resources/<name>.pipeline.yml pointing at src/. → 1-project-initialization-with-dab.md#workflow-b-pipeline-in-existing-bundledatabricks pipelines create / start-update / list-pipeline-events; formalise into a bundle later if the work goes to production. → 2-rapid-iteration-with-cli.mdsrc/ or transformations/.Picking the right run command depends on the workflow chosen above.
Workflow A / B (DAB) — Code changes only take effect after databricks bundle deploy. Always deploy before any run, dry run, or selective refresh.
databricks bundle validate --profile <profile>
databricks bundle deploy -t dev --profile <profile>
databricks bundle run <pipeline_name> -t dev --profile <profile>
databricks pipelines get <pipeline_id> --profile <profile> # status→ Full DAB run + iteration details: references/1-project-initialization-with-dab.md#running-a-pipeline-workflow-a--b
Workflow C (CLI, no bundle) — Upload files to the workspace, then drive the pipeline directly. Re-upload after every code change.
databricks workspace import-dir ./my_pipeline /Workspace/Users/<user>/my_pipeline --overwrite
databricks pipelines start-update <pipeline_id>→ Full CLI run + polling pattern: references/2-rapid-iteration-with-cli.md
Refresh modes (both workflows):
Always poll the update, not top-level pipeline state — see the polling rationale in 2-rapid-iteration-with-cli.md#step-4-start-an-update-and-poll-that-update. Same rule applies to bundle runs.
Project & lifecycle:
Cross-cutting patterns:
Auto Loader format-specific options: JSON · CSV · XML · Parquet · Avro · Text · ORC.
Dataset, flow, CDC, expectation, Auto Loader, and sink references are listed per (feature, language) in the API Reference tables above.
ea6452e
If you maintain this skill, you can claim it as your own. Once claimed, you can manage eval scenarios, bundle related skills, attach documentation or rules, and ensure cross-agent compatibility.