or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.mdpyspark-decorators.mdspark-hooks.mdspark-operators.md
tile.json

tessl/pypi-apache-airflow-providers-apache-spark

Provider package for Apache Spark integration with Apache Airflow, offering operators, hooks, sensors, and decorators for distributed data processing workflows.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/apache-airflow-providers-apache-spark@5.3.x

To install, run

npx @tessl/cli install tessl/pypi-apache-airflow-providers-apache-spark@5.3.0

index.mddocs/

Apache Airflow Providers Apache Spark

A comprehensive provider package that enables seamless integration between Apache Airflow and Apache Spark distributed computing framework. This package provides operators, hooks, and decorators for orchestrating Spark jobs within Airflow workflows, supporting multiple Spark deployment modes including Spark Submit, Spark SQL, Spark JDBC operations, and the modern Spark Connect protocol.

Package Information

  • Package Name: apache-airflow-providers-apache-spark
  • Language: Python
  • Installation: pip install apache-airflow-providers-apache-spark
  • Requires: Apache Airflow >= 2.10.0, PySpark >= 3.5.2, grpcio-status >= 1.59.0

Core Imports

# Operators for executing Spark jobs
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from airflow.providers.apache.spark.operators.spark_sql import SparkSqlOperator
from airflow.providers.apache.spark.operators.spark_jdbc import SparkJDBCOperator

# Hooks for Spark connections
from airflow.providers.apache.spark.hooks.spark_submit import SparkSubmitHook
from airflow.providers.apache.spark.hooks.spark_sql import SparkSqlHook
from airflow.providers.apache.spark.hooks.spark_jdbc import SparkJDBCHook
from airflow.providers.apache.spark.hooks.spark_connect import SparkConnectHook

# Task decorator for PySpark functions
from airflow.providers.apache.spark.decorators.pyspark import pyspark_task

Basic Usage

from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from airflow.providers.apache.spark.operators.spark_sql import SparkSqlOperator
from airflow.providers.apache.spark.decorators.pyspark import pyspark_task

# Define DAG
dag = DAG(
    'spark_example',
    default_args={
        'owner': 'data-team',
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
    },
    description='Example Spark workflow',
    schedule_interval=timedelta(days=1),
    start_date=datetime(2023, 1, 1),
    catchup=False,
)

# Submit a Spark application
spark_job = SparkSubmitOperator(
    task_id='run_spark_job',
    application='/path/to/spark_app.py',
    conn_id='spark_default',
    conf={
        'spark.executor.memory': '4g',
        'spark.executor.cores': '2',
        'spark.driver.memory': '2g',
    },
    dag=dag,
)

# Execute SQL query with Spark
spark_sql = SparkSqlOperator(
    task_id='run_spark_sql',
    sql='SELECT COUNT(*) FROM users WHERE active = true',
    conn_id='spark_sql_default',
    dag=dag,
)

# PySpark task decorator example
@pyspark_task(task_id='process_data')
def process_user_data(spark):
    df = spark.read.parquet('/data/users.parquet')
    result = df.filter(df.active == True).groupBy('region').count()
    result.write.mode('overwrite').parquet('/data/user_counts.parquet')
    return result.count()

process_task = process_user_data()

# Set task dependencies
spark_job >> spark_sql >> process_task

Architecture

The Apache Spark provider follows Airflow's standard provider pattern with distinct layers:

  • Operators: Task-level components that execute Spark jobs within Airflow workflows
  • Hooks: Connection management for various Spark interfaces (Submit, SQL, JDBC, Connect)
  • Decorators: Pythonic task decorators that automatically inject Spark sessions
  • Connection Types: Pre-configured connection interfaces for different Spark deployment modes

This design enables flexible integration with various Spark deployment architectures including local mode, YARN clusters, Kubernetes, Standalone clusters, and cloud-managed Spark services through consistent Airflow abstractions.

Capabilities

Spark Job Execution

Execute Spark applications using spark-submit binary with comprehensive configuration support. Handles Spark application submission, monitoring, and resource management across different cluster managers.

class SparkSubmitOperator(BaseOperator):
    def __init__(
        self,
        application: str = None,
        conf: dict = None,
        conn_id: str = 'spark_default',
        files: str = None,
        py_files: str = None,
        archives: str = None,
        driver_class_path: str = None,
        jars: str = None,
        java_class: str = None,
        packages: str = None,
        exclude_packages: str = None,
        repositories: str = None,
        total_executor_cores: int = None,
        executor_cores: int = None,
        executor_memory: str = None,
        driver_memory: str = None,
        keytab: str = None,
        principal: str = None,
        proxy_user: str = None,
        name: str = None,
        num_executors: int = None,
        application_args: list = None,
        env_vars: dict = None,
        verbose: bool = False,
        spark_binary: str = 'spark-submit',
        properties_file: str = None,
        **kwargs
    ): ...
    
    def execute(self, context): ...
    def on_kill(self): ...

Spark Operators

Spark Connection Management

Manage connections to various Spark interfaces including traditional spark-submit, Spark SQL, JDBC operations, and modern Spark Connect protocol. Provides connection configuration, authentication, and cluster communication.

class SparkSubmitHook(BaseHook):
    conn_name_attr = "conn_id"
    default_conn_name = "spark_default"
    conn_type = "spark"
    hook_name = "Spark"
    
    def submit(self, application: str, **kwargs) -> None: ...
    def on_kill(self) -> None: ...
    def get_conn(self): ...

class SparkConnectHook(BaseHook):
    conn_name_attr = "conn_id"
    default_conn_name = "spark_connect_default"
    conn_type = "spark_connect"
    hook_name = "Spark Connect"
    
    def get_connection_url(self) -> str: ...

Spark Hooks

PySpark Task Integration

Create PySpark tasks that automatically receive Spark session objects, enabling seamless integration of PySpark code within Airflow workflows with automatic session management and cleanup.

def pyspark_task(
    python_callable: Callable | None = None,
    multiple_outputs: bool | None = None,
    **kwargs,
) -> TaskDecorator: ...

# Usage example:
@pyspark_task
def my_spark_function(spark):
    """Function receives SparkSession as 'spark' parameter"""
    df = spark.createDataFrame([(1, 'a'), (2, 'b')], ['id', 'value'])
    return df.count()

PySpark Decorators

Connection Types

The provider registers these connection types in Airflow:

  1. spark - For SparkSubmitHook connections to Spark clusters
  2. spark_sql - For SparkSqlHook connections to execute SQL queries
  3. spark_jdbc - For SparkJDBCHook connections for database transfers
  4. spark_connect - For SparkConnectHook connections using Spark Connect protocol

Each connection type provides custom UI fields for configuration including cluster URLs, authentication credentials, SSL settings, and deployment-specific parameters.

Error Handling

Common exceptions that may be raised:

  • AirflowException - General Spark job execution failures, configuration errors
  • AirflowNotFoundException - Missing Spark applications, connection configurations
  • Connection errors - Cluster connectivity issues, authentication failures
  • Spark application errors - Application-specific failures, resource constraints

Handle these exceptions in your DAG error handling and retry logic as appropriate for your use case.