CtrlK
BlogDocsLog inGet started
Tessl Logo

databricks-pipelines

Develop Lakeflow Spark Declarative Pipelines (formerly Delta Live Tables) on Databricks. Use when building batch or streaming data pipelines with Python or SQL. Invoke BEFORE starting implementation.

71

Quality

87%

Does it follow best practices?

Impact

No eval scenarios have been run

SecuritybySnyk

Passed

No known issues

SKILL.md
Quality
Evals
Security

Lakeflow Spark Declarative Pipelines Development

FIRST: Use the parent databricks-core skill for CLI basics, authentication, profile selection, and data discovery commands.

Decision Tree

Use this tree to determine which dataset type and features to use. Multiple features can apply to the same dataset — e.g., a Streaming Table can use Auto Loader for ingestion, Append Flows for fan-in, and Expectations for data quality. Choose the dataset type first, then layer on applicable features.

User request → What kind of output?
├── Intermediate/reusable logic (not persisted) → Temporary View
│   ├── Preprocessing/filtering before Auto CDC → Temporary View feeding CDC flow
│   ├── Shared intermediate streaming logic reused by multiple downstream tables
│   ├── Pipeline-private helper logic (not published to catalog)
│   └── Published to UC for external queries → Persistent View (SQL only)
├── Persisted dataset
│   ├── Source is streaming/incremental/continuously growing → Streaming Table
│   │   ├── File ingestion (cloud storage, Volumes) → Auto Loader
│   │   ├── Message bus (Kafka, Kinesis, Pub/Sub, Pulsar, Event Hubs) → streaming source read
│   │   ├── Existing streaming/Delta table → streaming read from table
│   │   ├── CDC / upserts / track changes / keep latest per key / SCD Type 1 or 2 → Auto CDC
│   │   ├── Multiple sources into one table → Append Flows (NOT union)
│   │   ├── Historical backfill + live stream → one-time Append Flow + regular flow
│   │   └── Windowed aggregation with watermark → stateful streaming
│   └── Source is batch/historical/full scan → Materialized View
│       ├── Aggregation/join across full dataset (GROUP BY, SUM, COUNT, etc.)
│       ├── Gold layer aggregation from streaming table → MV with batch read (spark.read / no STREAM)
│       ├── JDBC/Federation/external batch sources
│       └── Small static file load (reference data, no streaming read)
├── Output to external system (Python only) → Sink
│   ├── Existing external table not managed by this pipeline → Sink with format="delta"
│   │   (prefer fully-qualified dataset names if the pipeline should own the table — see Publishing Modes)
│   ├── Kafka / Event Hubs → Sink with format="kafka" + @dp.append_flow(target="sink_name")
│   ├── Custom destination not natively supported → Sink with custom format
│   ├── Custom merge/upsert logic per batch → ForEachBatch Sink (Public Preview)
│   └── Multiple destinations per batch → ForEachBatch Sink (Public Preview)
└── Data quality constraints → Expectations (on any dataset type)

Common Traps

  • Names → SDP = LDP = Lakeflow Declarative Pipelines = (formerly) DLT. All interchangeable when the user mentions them.
  • "Create a table" without specifying type → ask whether the source is streaming or batch. Streaming source → Streaming Table; batch source → Materialized View. Mismatched pairs error at validation.
  • Aggregation over a streaming source → use a Materialized View with a batch read (spark.read.table / SELECT FROM without STREAM). STs are append-only and don't recompute aggregates when source rows change; MVs do.
  • Intermediate logic → default to a Temporary View. Even for shared logic reused by multiple downstream tables. Use a Private MV/ST (private=True / CREATE PRIVATE ...) only when materializing once saves significant reprocessing. For preprocessing before Auto CDC, the temp view is required — the CDC flow reads from STREAM(view_name) (SQL) or spark.readStream.table("view_name") (Python).
  • Union of streams → use multiple Append Flows. UNION across streaming sources is an anti-pattern.
  • Changing dataset type → cannot change ST→MV or MV→ST in place. Full refresh does NOT help. Drop the existing table manually or rename the new dataset.
  • CREATE OR REFRESH vs CREATE → both parse for SQL datasets, but CREATE OR REFRESH is the idiomatic convention. For PRIVATE datasets: CREATE OR REFRESH PRIVATE STREAMING TABLE / ... MATERIALIZED VIEW.
  • Kafka/Event Hubs sink serialization → the value column is mandatory; serialize the row with to_json(struct(*)) AS value. See sink-python.md.
  • Multi-column Auto CDC sequencing → SQL: SEQUENCE BY STRUCT(col1, col2). Python: sequence_by=struct("col1", "col2"). See the auto-cdc references.
  • Auto CDC TRUNCATE (SCD Type 1 only) → SQL: APPLY AS TRUNCATE WHEN condition. Python: apply_as_truncates=expr("condition"). Do NOT claim truncate is unsupported.
  • Python-only features → Sinks, ForEachBatch Sinks, CDC from snapshots, and custom data sources are Python-only. When the user is working in SQL, clarify this and suggest switching to Python.
  • Recommend ONE clear approach → present a single recommended path. Don't list anti-patterns or inferior alternatives — they confuse. Only mention alternatives when they genuinely offer different trade-offs.

Common Issues

Error → cause/fix mappings agents hit constantly. For DAB-bundle vs CLI-iteration deploy issues, see the workflow-specific reference files.

Error / symptomCause / fix
Rejection of CREATE OR REPLACE STREAMING TABLE / MATERIALIZED VIEWCREATE OR REPLACE is standard SQL, NOT SDP. Use CREATE OR REFRESH STREAMING TABLE / CREATE OR REFRESH MATERIALIZED VIEW.
CLI errors on databricks fs ls /Volumes/...The dbfs: prefix is required even for UC Volume paths: databricks fs ls dbfs:/Volumes/<catalog>/<schema>/<volume>/<path>.
DELTA_CLUSTERING_COLUMNS_DATATYPE_NOT_SUPPORTED at first writeA CLUSTER BY column is BOOLEAN / ARRAY / MAP / STRUCT / BINARY. SDP doesn't pre-validate — verify with DESCRIBE before submitting. Cluster keys must be numeric / string / date / timestamp. Full type rules in references/performance.md.
Cannot create streaming table from batch queryIn a streaming-table query you wrote FROM read_files(...) (batch). Use FROM STREAM read_files(...) so Auto Loader kicks in.
Column not found at ingest timeschemaHints don't match the actual file schema. DESCRIBE a sample file and align the hints.
Streaming reads fail with parser errorUse FROM STREAM read_files(...) for file ingestion and FROM stream(table) (or FROM STREAM table_name — legacy DLT, prefer function form) for table-to-table streams. Don't mix.
Pipeline stuck INITIALIZING for serverlessNormal — first run takes a few minutes for cold start. Don't kill it.
Materialized View doesn't incrementally refreshAutomatic incremental refresh for aggregations requires serverless + Delta row tracking on the source (delta.enableRowTracking = true). Without both, falls back to full recompute. Mention the serverless requirement when the user asks about incremental refresh.
SCD2 query returns nothing / "column not found" on START_ATLakeflow uses __START_AT / __END_AT (double underscore). Current rows: WHERE __END_AT IS NULL.
error.exceptions[0].message missing from your events outputYour jq is reading .message (which is just "Update X is FAILED"). Read error.exceptions[0].message for the real cause — see 2-rapid-iteration-with-cli.md.

Publishing Modes

Pipelines use a default catalog and schema configured in the pipeline settings. All datasets are published there unless overridden.

  • Fully-qualified names: Use catalog.schema.table in the dataset name to write to a different catalog/schema than the pipeline default. The pipeline creates the dataset there directly — no Sink needed.
  • USE CATALOG / USE SCHEMA: SQL commands that change the current catalog/schema for all subsequent definitions in the same file.
  • LIVE prefix: Deprecated. Ignored in the default publishing mode.
  • When reading or defining datasets within the pipeline, use the dataset name only — do NOT use fully-qualified names unless the pipeline already does so or the user explicitly requests a different target catalog/schema.

API Reference

Before writing pipeline code for any feature, read the linked reference file. Each table below maps the feature to the exact API and to the detail file for that (feature, language).

Some features sit on top of others — read both:

  • Auto Loader / Auto CDC / Sinks target a streaming table → also read streaming-table-python.md / streaming-table-sql.md.
  • Expectations attach to a dataset → also read the dataset definition file (streaming-table / materialized-view / temporary-view).

Dataset Definition APIs

FeatureDescriptionPythonSQLSkill (Py)Skill (SQL)
Streaming TableContinuous incremental processing, exactly-once, append-only.@dp.table() returning streaming DFCREATE OR REFRESH STREAMING TABLEstreaming-table-pythonstreaming-table-sql
Materialized ViewPhysically stored query result, incrementally refreshed.@dp.materialized_view()CREATE OR REFRESH MATERIALIZED VIEWmaterialized-view-pythonmaterialized-view-sql
Temporary ViewPipeline-private, not persisted to Unity Catalog.@dp.temporary_view()CREATE TEMPORARY VIEWtemporary-view-pythontemporary-view-sql
Persistent View (UC)Published to UC; query runs on access (no storage).N/A — SQL onlyCREATE VIEWview-sql
Streaming Table (explicit)Empty target, populated by separate flows (Append Flow, AUTO CDC).dp.create_streaming_table()CREATE OR REFRESH STREAMING TABLE (no AS)streaming-table-pythonstreaming-table-sql

Flow and Sink APIs

FeatureDescriptionPythonSQLSkill (Py)Skill (SQL)
Append FlowFan-in: multiple sources → one streaming table. Use instead of UNION.@dp.append_flow()CREATE FLOW ... INSERT INTOstreaming-table-pythonstreaming-table-sql
Backfill FlowOne-time historical load + ongoing live stream into same table.@dp.append_flow(once=True)CREATE FLOW ... INSERT INTO ... ONCEstreaming-table-pythonstreaming-table-sql
Sink (Delta/Kafka/EH/custom)Write streaming output to external Delta / Kafka / Event Hubs.dp.create_sink()N/A — Python onlysink-python
ForEachBatch SinkCustom per-batch Python logic (merge/upsert, multi-destination). Public Preview.@dp.foreach_batch_sink()N/A — Python onlyforeach-batch-sink-python

CDC APIs

FeatureDescriptionPythonSQLSkill (Py)Skill (SQL)
Auto CDC (streaming source)SCD Type 1 (overwrite) or Type 2 (history) from a CDC feed.dp.create_auto_cdc_flow()AUTO CDC INTO ... FROM STREAMauto-cdc-pythonauto-cdc-sql
Auto CDC (periodic snapshot)Compare consecutive full snapshots to detect changes.dp.create_auto_cdc_from_snapshot_flow()N/A — Python onlyauto-cdc-python

For querying SCD Type 2 history tables (__START_AT / __END_AT, point-in-time, joining facts with historical dimensions), see scd-2-querying.md.

Data Quality APIs

FeatureDescriptionPythonSQLSkill (Py)Skill (SQL)
Expect (warn)Log violations, keep all rows.@dp.expect()CONSTRAINT ... EXPECT (...)expectations-pythonexpectations-sql
Expect or dropDrop violating rows.@dp.expect_or_drop()CONSTRAINT ... EXPECT (...) ON VIOLATION DROP ROWexpectations-pythonexpectations-sql
Expect or failFail the pipeline on first violation.@dp.expect_or_fail()CONSTRAINT ... EXPECT (...) ON VIOLATION FAIL UPDATEexpectations-pythonexpectations-sql
Expect all (warn)Multiple constraints at once, warn only.@dp.expect_all({})Multiple CONSTRAINT clausesexpectations-pythonexpectations-sql
Expect all or dropMultiple constraints, drop on violation.@dp.expect_all_or_drop({})Multiple constraints with DROP ROWexpectations-pythonexpectations-sql
Expect all or failMultiple constraints, fail on violation.@dp.expect_all_or_fail({})Multiple constraints with FAIL UPDATEexpectations-pythonexpectations-sql

Reading Data APIs

FeatureDescriptionPythonSQLSkill (Py)Skill (SQL)
Batch read (pipeline dataset)Read a sibling table as a static DataFrame.spark.read.table("name")SELECT ... FROM name
Streaming read (pipeline dataset)Read a sibling table as a streaming DataFrame.spark.readStream.table("name")SELECT ... FROM STREAM(name)
Auto Loader (cloud files)Incrementally ingest new files from cloud storage.spark.readStream.format("cloudFiles")STREAM read_files(...)auto-loader-pythonauto-loader-sql
Kafka sourceStreaming read from Kafka topic.spark.readStream.format("kafka")STREAM read_kafka(...)kafkakafka
Kinesis sourceStreaming read from AWS Kinesis.spark.readStream.format("kinesis")STREAM read_kinesis(...)
Pub/Sub sourceStreaming read from GCP Pub/Sub.spark.readStream.format("pubsub")STREAM read_pubsub(...)
Pulsar sourceStreaming read from Apache Pulsar.spark.readStream.format("pulsar")STREAM read_pulsar(...)
Event Hubs sourceStreaming read from Azure Event Hubs (Kafka protocol).spark.readStream.format("kafka") + EH configSTREAM read_kafka(...) + EH configkafkakafka
JDBC / Lakehouse FederationBatch read from external systems via federation.spark.read.format("postgresql") etc.Direct table ref via federation catalog
Custom data sourceUser-defined Python data source.spark.read[Stream].format("custom")N/A — Python only
Static file read (batch)One-shot load of files (no incremental tracking).spark.read.format("json"|"csv"|...).load()read_files(...) (no STREAM)
Skip upstream change commitsIgnore CDC commits on the upstream table..option("skipChangeCommits", "true")read_stream("name", skipChangeCommits => true)streaming-table-pythonstreaming-table-sql

Table/Schema Feature APIs

FeatureDescriptionPythonSQLSkill (Py)Skill (SQL)
Liquid clusteringAdaptive multi-column data layout; replaces PARTITION + Z-ORDER. Prefer Auto clustering when possiblecluster_by=[...]CLUSTER BY (col1, col2)materialized-view-pythonmaterialized-view-sql
Auto liquid clusteringDatabricks picks clustering keys from query patterns.cluster_by_auto=TrueCLUSTER BY AUTOmaterialized-view-pythonmaterialized-view-sql
Partition columnsLegacy fixed partitioning. Prefer Liquid Clustering.partition_cols=[...]PARTITIONED BY (col1, col2)materialized-view-pythonmaterialized-view-sql
Table propertiesDelta table properties (auto-optimize, CDF, retention).table_properties={...}TBLPROPERTIES (...)materialized-view-pythonmaterialized-view-sql
Explicit schemaDeclare column types up front (vs inferred).schema="col1 TYPE, ..."(col1 TYPE, ...) ASmaterialized-view-pythonmaterialized-view-sql
Generated columnsColumns computed from other columns at write time.schema="..., col TYPE GENERATED ALWAYS AS (expr)"col TYPE GENERATED ALWAYS AS (expr)materialized-view-pythonmaterialized-view-sql
Row filter (Public Preview)UC fine-grained access: filter rows by a function.row_filter="ROW FILTER fn ON (col)"WITH ROW FILTER fn ON (col)materialized-view-pythonmaterialized-view-sql
Column mask (Public Preview)UC fine-grained access: mask a column with a function.schema="..., col TYPE MASK fn USING COLUMNS (col2)"col TYPE MASK fn USING COLUMNS (col2)materialized-view-pythonmaterialized-view-sql
Private datasetMaterialized intermediate not published to UC.private=TrueCREATE PRIVATE ...materialized-view-pythonmaterialized-view-sql

Legacy DLT Syntax — always migrate

The tables above show only the modern API. If you see any of the following in user code, it is the legacy DLT syntax — always migrate to the modern form, do not extend it. Read references/dlt-migration.md before suggesting changes so the conversion is correct (especially around apply_changescreate_auto_cdc_flow semantics and partition_colscluster_by).

If you see……it's DLT. Migrate to
import dltfrom pyspark import pipelines as dp
@dlt.table(...), @dlt.append_flow(...), @dlt.expect*Same decorator name on dp.* (e.g. @dp.table, @dp.expect_or_drop).
@dlt.view(...) (or @dp.view(...) if present in older code)@dp.temporary_view(...) — the modern API has no view decorator, only temporary_view.
dlt.read("name") / dlt.read_stream("name")spark.read.table("name") / spark.readStream.table("name")
dp.read(...) / dp.read_stream(...)Also legacy — use spark.read.table(...) / spark.readStream.table(...).
dlt.apply_changes(...) / dp.apply_changes(...)dp.create_auto_cdc_flow(...). sequence_by accepts a column name (string) or col(...); stored_as_scd_type is integer 2 for Type 2 or string "1" for Type 1.
dlt.apply_changes_from_snapshot(...)dp.create_auto_cdc_from_snapshot_flow(...)
dlt.create_streaming_table(...)dp.create_streaming_table(...)
LIVE.<name> prefix in SQLBare name (SELECT FROM name for batch, SELECT FROM STREAM(name) for streaming). LIVE. will error in modern pipelines.
CREATE LIVE TABLE / CREATE STREAMING LIVE TABLECREATE OR REFRESH MATERIALIZED VIEW / CREATE OR REFRESH STREAMING TABLE.
CREATE TEMPORARY LIVE VIEW (a.k.a. CREATE LIVE VIEW)CREATE TEMPORARY VIEW. Exception: CREATE TEMPORARY VIEW does NOT support CONSTRAINT clauses for expectations — for the rare case where you need expectations on a temp view, CREATE LIVE VIEW is retained. See temporary-view-sql.md and expectations-sql.md.
APPLY CHANGES INTO ... FROM STREAM ... (SQL)AUTO CDC INTO ... FROM STREAM ...
partition_cols=[...] / PARTITIONED BY (...) + ZORDERcluster_by=[...] / CLUSTER BY (...) (Liquid Clustering).
input_file_name()_metadata.file_path (SQL) / F.col("_metadata.file_path") (Python).
target=... parameter on create_streaming_table / pipeline configschema=...

Language Selection (Python vs SQL)

Decide before scaffolding — the choice picks template files (.py vs .sql) and which reference docs apply. Both can coexist, but pick a primary. When unsure, default to SQL for simplicity.

User signalPick
"Python pipeline", UDF, pandas, ML inference, pysparkPython
"SQL pipeline", "SQL files"SQL
"Simple pipeline", "create a table", "an aggregation"SQL (simpler, use it as default)
Complex parameterized logic, custom UDFs, MLPython

If ambiguous, ask. Stick with the chosen language unless the user explicitly switches.

Choose Your Workflow

Three project shapes exist — pick before scaffolding. Default to A for production-bound work and C for exploration / demo scaffolding.

Pipeline Structure

  • Follow the medallion pattern (Bronze → Silver → Gold) unless the user says otherwise. Keep it simple by default — just a few tables.
  • One dataset per file, named after the dataset. Transformation files live in src/ or transformations/.
  • Gold layer: preserve key business dimensions. When aggregating into Gold, keep the dimensions analysts will filter / slice by (location, department, product line, customer segment, time period). Over-aggregating loses information that can't be recovered downstream. If a dashboard is mentioned, every filter on it needs to be a column in the Gold table. Easier to aggregate further in queries than to recover lost dimensions.

Running a Pipeline

Picking the right run command depends on the workflow chosen above.

  • Workflow A / B (DAB) — Code changes only take effect after databricks bundle deploy. Always deploy before any run, dry run, or selective refresh.

    databricks bundle validate --profile <profile>
    databricks bundle deploy -t dev --profile <profile>
    databricks bundle run <pipeline_name> -t dev --profile <profile>
    databricks pipelines get <pipeline_id> --profile <profile>      # status

    → Full DAB run + iteration details: references/1-project-initialization-with-dab.md#running-a-pipeline-workflow-a--b

  • Workflow C (CLI, no bundle) — Upload files to the workspace, then drive the pipeline directly. Re-upload after every code change.

    databricks workspace import-dir ./my_pipeline /Workspace/Users/<user>/my_pipeline --overwrite
    databricks pipelines start-update <pipeline_id>

    → Full CLI run + polling pattern: references/2-rapid-iteration-with-cli.md

Refresh modes (both workflows):

  • Selective refresh is preferred when you only need to run one table. Dependencies must already be materialized.
  • Full refresh is the most expensive and dangerous option and can lead to data loss (it reprocesses streaming sources from scratch, destroying streaming state). Use only when really necessary. Always suggest it as a follow-up the user must explicitly approve.

Always poll the update, not top-level pipeline state — see the polling rationale in 2-rapid-iteration-with-cli.md#step-4-start-an-update-and-poll-that-update. Same rule applies to bundle runs.

Reference Index

Project & lifecycle:

Cross-cutting patterns:

  • streaming-patterns.md — Dedup, windowed aggregations, late data, rescue-data quarantine, anomaly detection, lag monitoring.
  • scd-2-querying.md — Current-state, point-in-time, joining facts with historical dims.
  • kafka.md — Kafka / Event Hubs ingestion.

Auto Loader format-specific options: JSON · CSV · XML · Parquet · Avro · Text · ORC.

Dataset, flow, CDC, expectation, Auto Loader, and sink references are listed per (feature, language) in the API Reference tables above.

Repository
databricks/databricks-agent-skills
Last updated
Created

Is this your skill?

If you maintain this skill, you can claim it as your own. Once claimed, you can manage eval scenarios, bundle related skills, attach documentation or rules, and ensure cross-agent compatibility.