CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-databricks

Comprehensive Databricks integration for Apache Airflow with operators, hooks, sensors, and triggers for orchestrating data workflows

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

sql-operations.mddocs/

SQL Operations

The Databricks provider offers comprehensive SQL operations for executing queries on Databricks SQL endpoints and clusters. This includes query execution, data loading with COPY INTO, and result management with multiple output formats.

Core Operators

DatabricksSqlOperator

Execute SQL queries on Databricks SQL endpoints with flexible output and result handling.

from airflow.providers.databricks.operators.databricks_sql import DatabricksSqlOperator

class DatabricksSqlOperator(SQLExecuteQueryOperator):
    def __init__(
        self,
        sql: str | list[str],
        *,
        databricks_conn_id: str = "databricks_default",
        http_path: str | None = None,
        sql_endpoint_name: str | None = None,
        session_configuration: dict[str, str] | None = None,
        http_headers: list[tuple[str, str]] | None = None,
        catalog: str | None = None,
        schema: str | None = None,
        output_path: str | None = None,
        output_format: str = "csv",
        csv_params: dict[str, Any] | None = None,
        client_parameters: dict[str, Any] | None = None,
        return_last: bool = True,
        do_xcom_push: bool = True,
        caller: str = "DatabricksSqlOperator",
        **kwargs
    ) -> None:
        """
        Execute SQL queries on Databricks SQL endpoints.
        
        Args:
            sql: SQL query or list of queries to execute
            databricks_conn_id: Airflow connection ID for Databricks
            http_path: HTTP path to SQL endpoint or cluster
            sql_endpoint_name: Name of SQL endpoint to use
            session_configuration: Session-level configuration parameters
            http_headers: Additional HTTP headers for requests
            catalog: Default catalog for SQL operations
            schema: Default schema for SQL operations
            output_path: Path to save query results (supports templating)
            output_format: Output format - "csv", "json", or "parquet"
            csv_params: CSV-specific formatting parameters
            client_parameters: Additional client configuration parameters
            return_last: Return only the last result set for multiple queries
            do_xcom_push: Whether to push results to XCom
            caller: Caller identification for logging and monitoring
        """

DatabricksCopyIntoOperator

Load data into Databricks tables using the COPY INTO command with comprehensive format support.

from airflow.providers.databricks.operators.databricks_sql import DatabricksCopyIntoOperator

class DatabricksCopyIntoOperator(BaseOperator):
    def __init__(
        self,
        *,
        table_name: str,
        file_location: str,
        file_format: str | dict[str, Any],
        files: list[str] | None = None,
        pattern: str | None = None,
        expression_list: list[str] | None = None,
        credential: dict[str, str] | None = None,
        encryption: dict[str, str] | None = None,
        format_options: dict[str, Any] | None = None,
        force_copy: bool = False,
        validate: str | int | bool | None = None,
        copy_options: dict[str, Any] | None = None,
        databricks_conn_id: str = "databricks_default",
        http_path: str | None = None,
        sql_endpoint_name: str | None = None,
        session_configuration: dict[str, str] | None = None,
        http_headers: list[tuple[str, str]] | None = None,
        catalog: str | None = None,
        schema: str | None = None,
        caller: str = "DatabricksCopyIntoOperator",
        **kwargs
    ) -> None:
        """
        Load data using Databricks COPY INTO command.
        
        Args:
            table_name: Target table name for data loading
            file_location: Source file location (cloud storage path)
            file_format: File format specification (string or format options dict)
            files: Specific files to copy (optional, alternative to pattern)
            pattern: File pattern to match for copying
            expression_list: List of expressions for data transformation during copy
            credential: Credential configuration for accessing source files
            encryption: Encryption configuration for source files
            format_options: Format-specific options (delimiter, header, etc.)
            force_copy: Force copy operation even if files were previously copied
            validate: Validation mode - "ALL", number of rows, or boolean
            copy_options: Additional copy operation options
            databricks_conn_id: Airflow connection ID for Databricks
            http_path: HTTP path to SQL endpoint or cluster
            sql_endpoint_name: Name of SQL endpoint to use
            session_configuration: Session-level configuration parameters
            http_headers: Additional HTTP headers for requests
            catalog: Default catalog for SQL operations
            schema: Default schema for SQL operations
            caller: Caller identification for logging and monitoring
        """

Usage Examples

Basic Query Execution

Execute SQL queries with result export:

from airflow.providers.databricks.operators.databricks_sql import DatabricksSqlOperator

# Simple query execution
daily_report = DatabricksSqlOperator(
    task_id='generate_daily_report',
    databricks_conn_id='databricks_sql',
    sql="""
        SELECT 
            date_trunc('day', order_timestamp) as order_date,
            customer_segment,
            COUNT(*) as order_count,
            SUM(order_amount) as total_revenue
        FROM sales.orders 
        WHERE date_trunc('day', order_timestamp) = '{{ ds }}'
        GROUP BY date_trunc('day', order_timestamp), customer_segment
        ORDER BY customer_segment
    """,
    catalog='sales',
    schema='reports',
    output_path='/tmp/daily_report_{{ ds }}.csv',
    output_format='csv'
)

Multi-Statement SQL Execution

Execute multiple SQL statements in sequence:

data_pipeline = DatabricksSqlOperator(
    task_id='run_data_pipeline',
    sql=[
        "DROP TABLE IF EXISTS staging.temp_customer_metrics",
        """
        CREATE TABLE staging.temp_customer_metrics AS
        SELECT 
            customer_id,
            COUNT(DISTINCT order_id) as order_count,
            SUM(order_amount) as lifetime_value,
            AVG(order_amount) as avg_order_value,
            MAX(order_timestamp) as last_order_date
        FROM raw.orders
        WHERE order_timestamp >= '{{ macros.ds_add(ds, -30) }}'
        GROUP BY customer_id
        """,
        """
        INSERT INTO analytics.customer_metrics
        SELECT * FROM staging.temp_customer_metrics
        WHERE order_count >= 2
        """,
        "DROP TABLE staging.temp_customer_metrics"
    ],
    databricks_conn_id='databricks_analytics',
    http_path='/sql/1.0/warehouses/warehouse123',
    return_last=False  # Don't return results for pipeline operations
)

Query with Parameters and Configuration

Execute parameterized queries with session configuration:

analytical_query = DatabricksSqlOperator(
    task_id='customer_segmentation',
    sql="""
        WITH customer_metrics AS (
            SELECT 
                customer_id,
                SUM(order_amount) as total_spent,
                COUNT(*) as order_frequency,
                DATEDIFF(CURRENT_DATE(), MAX(order_date)) as days_since_last_order
            FROM {{ params.source_table }}
            WHERE order_date >= '{{ params.analysis_start_date }}'
            GROUP BY customer_id
        )
        SELECT 
            CASE 
                WHEN total_spent > 1000 AND order_frequency > 10 THEN 'VIP'
                WHEN total_spent > 500 AND order_frequency > 5 THEN 'Premium'
                WHEN days_since_last_order <= 30 THEN 'Active'
                ELSE 'Standard'
            END as segment,
            COUNT(*) as customer_count,
            AVG(total_spent) as avg_spending
        FROM customer_metrics
        GROUP BY segment
        ORDER BY avg_spending DESC
    """,
    params={
        'source_table': 'sales.orders',
        'analysis_start_date': '{{ macros.ds_add(ds, -365) }}'
    },
    session_configuration={
        'spark.sql.adaptive.enabled': 'true',
        'spark.sql.adaptive.coalescePartitions.enabled': 'true',
        'spark.serializer': 'org.apache.spark.serializer.KryoSerializer'
    },
    output_path='/analytics/segments/customer_segmentation_{{ ds }}.parquet',
    output_format='parquet'
)

CSV Data Loading with COPY INTO

Load CSV data with format specifications:

from airflow.providers.databricks.operators.databricks_sql import DatabricksCopyIntoOperator

load_customer_data = DatabricksCopyIntoOperator(
    task_id='load_customer_csv',
    table_name='raw.customers',
    file_location='s3://data-lake/customers/{{ ds }}/',
    file_format='CSV',
    format_options={
        'header': 'true',
        'delimiter': ',',
        'quote': '"',
        'escape': '\\',
        'inferSchema': 'true',
        'timestampFormat': 'yyyy-MM-dd HH:mm:ss'
    },
    pattern='customer_*.csv',
    copy_options={
        'mergeSchema': 'true',
        'force': 'false'
    },
    databricks_conn_id='databricks_etl',
    catalog='raw',
    schema='ingestion'
)

JSON Data Loading

Load JSON files with nested structure handling:

load_events = DatabricksCopyIntoOperator(
    task_id='load_event_json',
    table_name='events.user_actions',
    file_location='s3://event-streams/user-actions/year={{ macros.ds_format(ds, "%Y-%m-%d", "%Y") }}/month={{ macros.ds_format(ds, "%Y-%m-%d", "%m") }}/day={{ macros.ds_format(ds, "%Y-%m-%d", "%d") }}/',
    file_format='JSON',
    expression_list=[
        'user_id',
        'action_type',
        'timestamp::timestamp as event_timestamp',
        'properties:device_type::string as device_type',
        'properties:session_id::string as session_id',
        'properties:page_url::string as page_url'
    ],
    format_options={
        'multiLine': 'false',
        'timestampFormat': 'yyyy-MM-dd\'T\'HH:mm:ss.SSSZ'
    },
    validate='ALL',
    databricks_conn_id='databricks_events'
)

Parquet Data Loading with Credentials

Load Parquet files with cloud storage credentials:

load_parquet_data = DatabricksCopyIntoOperator(
    task_id='load_sales_parquet',
    table_name='analytics.daily_sales',
    file_location='abfss://data@storageaccount.dfs.core.windows.net/sales/{{ ds }}/',
    file_format='PARQUET',
    credential={
        'AZURE_SAS_TOKEN': '{{ var.value.azure_sas_token }}'
    },
    files=['sales_summary.parquet', 'sales_details.parquet'],
    force_copy=True,
    databricks_conn_id='databricks_production',
    sql_endpoint_name='analytics-endpoint'
)

Advanced Features

Result Export to Multiple Formats

Export query results in different formats:

# CSV export with custom formatting
csv_export = DatabricksSqlOperator(
    task_id='export_to_csv',
    sql="SELECT * FROM analytics.monthly_summary WHERE report_month = '{{ ds }}'",
    output_path='/reports/monthly_summary_{{ ds }}.csv',
    output_format='csv',
    csv_params={
        'header': True,
        'delimiter': '|',
        'quoteAll': True,
        'timestampFormat': 'yyyy-MM-dd HH:mm:ss'
    }
)

# JSON export with nested structures
json_export = DatabricksSqlOperator(
    task_id='export_to_json', 
    sql="""
        SELECT 
            customer_id,
            struct(
                first_name,
                last_name,
                email
            ) as customer_info,
            collect_list(
                struct(order_id, order_date, amount)
            ) as orders
        FROM customer_orders 
        WHERE order_date >= '{{ ds }}'
        GROUP BY customer_id, first_name, last_name, email
    """,
    output_path='/exports/customers_{{ ds }}.json',
    output_format='json'
)

# Parquet export for large datasets
parquet_export = DatabricksSqlOperator(
    task_id='export_to_parquet',
    sql="SELECT * FROM large_dataset WHERE partition_date = '{{ ds }}'",
    output_path='/data/exports/large_dataset_{{ ds }}.parquet',
    output_format='parquet'
)

Session Configuration and Optimization

Configure Spark SQL settings for optimal performance:

optimized_query = DatabricksSqlOperator(
    task_id='optimized_aggregation',
    sql="""
        SELECT 
            product_category,
            DATE_TRUNC('month', sale_date) as sale_month,
            SUM(quantity * unit_price) as revenue,
            COUNT(DISTINCT customer_id) as unique_customers
        FROM sales_fact s
        JOIN product_dim p ON s.product_id = p.product_id
        WHERE sale_date >= '{{ macros.ds_add(ds, -90) }}'
        GROUP BY product_category, DATE_TRUNC('month', sale_date)
    """,
    session_configuration={
        # Enable adaptive query execution
        'spark.sql.adaptive.enabled': 'true',
        'spark.sql.adaptive.coalescePartitions.enabled': 'true',
        'spark.sql.adaptive.skewJoin.enabled': 'true',
        
        # Optimize for large datasets
        'spark.sql.adaptive.advisoryPartitionSizeInBytes': '256MB',
        'spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold': '200MB',
        
        # Enable column pruning and predicate pushdown
        'spark.sql.optimizer.nestedSchemaPruning.enabled': 'true',
        'spark.sql.optimizer.dynamicPartitionPruning.enabled': 'true'
    },
    databricks_conn_id='databricks_analytics',
    catalog='sales',
    schema='aggregated'
)

Data Quality Validation with COPY INTO

Implement data validation during loading:

validated_load = DatabricksCopyIntoOperator(
    task_id='validated_data_load',
    table_name='trusted.customer_transactions',
    file_location='s3://raw-data/transactions/{{ ds }}/',
    file_format={
        'format': 'CSV',
        'options': {
            'header': 'true',
            'delimiter': ',',
            'quote': '"',
            'dateFormat': 'yyyy-MM-dd',
            'timestampFormat': 'yyyy-MM-dd HH:mm:ss'
        }
    },
    expression_list=[
        'customer_id::long as customer_id',
        'transaction_date::date as transaction_date', 
        'amount::decimal(10,2) as amount',
        'transaction_type',
        'CASE WHEN length(trim(description)) = 0 THEN NULL ELSE description END as description',
        '_metadata.file_path as source_file'
    ],
    validate=1000,  # Validate first 1000 rows
    copy_options={
        'force': 'false',
        'mergeSchema': 'false'
    },
    databricks_conn_id='databricks_etl'
)

Custom HTTP Headers and Client Configuration

Configure custom client settings and authentication headers:

custom_client_query = DatabricksSqlOperator(
    task_id='query_with_custom_client',
    sql="SELECT COUNT(*) FROM system.access_logs WHERE log_date = '{{ ds }}'",
    http_headers=[
        ('X-Custom-Header', 'airflow-pipeline'),
        ('X-Request-ID', '{{ run_id }}'),
        ('X-User-Agent', 'Airflow/{{ var.value.airflow_version }}')
    ],
    client_parameters={
        'connect_timeout': 60,
        'socket_timeout': 300,
        '_user_agent_entry': 'AirflowPipeline/1.0'
    },
    databricks_conn_id='databricks_monitoring'
)

Error Handling and Monitoring

Query Result Validation

Validate query results and handle empty results:

def validate_results(**context):
    """Custom validation for query results."""
    ti = context['task_instance']
    results = ti.xcom_pull(task_ids='daily_metrics_query')
    
    if not results or len(results) == 0:
        raise ValueError("No data found for the specified date")
    
    row_count = len(results)
    if row_count < 100:  # Expected minimum rows
        raise ValueError(f"Insufficient data: only {row_count} rows found")

validated_query = DatabricksSqlOperator(
    task_id='daily_metrics_query',
    sql="SELECT * FROM metrics.daily_kpis WHERE metric_date = '{{ ds }}'",
    databricks_conn_id='databricks_metrics'
) >> PythonOperator(
    task_id='validate_query_results',
    python_callable=validate_results
)

Retry Configuration for Resilience

Configure robust retry mechanisms:

resilient_query = DatabricksSqlOperator(
    task_id='resilient_analytics',
    sql="SELECT * FROM complex_analytics_view WHERE process_date = '{{ ds }}'",
    databricks_conn_id='databricks_analytics',
    retries=3,
    retry_delay=timedelta(minutes=5),
    retry_exponential_backoff=True,
    max_retry_delay=timedelta(minutes=30)
)

The SQL operations provide comprehensive capabilities for data processing, loading, and analytics on Databricks SQL endpoints with robust error handling, multiple output formats, and optimization features.

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-databricks

docs

connections.md

index.md

job-management.md

monitoring.md

repositories.md

sql-operations.md

workflows.md

tile.json