Provider package for Apache Spark integration with Apache Airflow, offering operators, hooks, sensors, and decorators for distributed data processing workflows.
npx @tessl/cli install tessl/pypi-apache-airflow-providers-apache-spark@5.3.0A comprehensive provider package that enables seamless integration between Apache Airflow and Apache Spark distributed computing framework. This package provides operators, hooks, and decorators for orchestrating Spark jobs within Airflow workflows, supporting multiple Spark deployment modes including Spark Submit, Spark SQL, Spark JDBC operations, and the modern Spark Connect protocol.
pip install apache-airflow-providers-apache-spark# Operators for executing Spark jobs
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from airflow.providers.apache.spark.operators.spark_sql import SparkSqlOperator
from airflow.providers.apache.spark.operators.spark_jdbc import SparkJDBCOperator
# Hooks for Spark connections
from airflow.providers.apache.spark.hooks.spark_submit import SparkSubmitHook
from airflow.providers.apache.spark.hooks.spark_sql import SparkSqlHook
from airflow.providers.apache.spark.hooks.spark_jdbc import SparkJDBCHook
from airflow.providers.apache.spark.hooks.spark_connect import SparkConnectHook
# Task decorator for PySpark functions
from airflow.providers.apache.spark.decorators.pyspark import pyspark_taskfrom datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from airflow.providers.apache.spark.operators.spark_sql import SparkSqlOperator
from airflow.providers.apache.spark.decorators.pyspark import pyspark_task
# Define DAG
dag = DAG(
'spark_example',
default_args={
'owner': 'data-team',
'retries': 1,
'retry_delay': timedelta(minutes=5),
},
description='Example Spark workflow',
schedule_interval=timedelta(days=1),
start_date=datetime(2023, 1, 1),
catchup=False,
)
# Submit a Spark application
spark_job = SparkSubmitOperator(
task_id='run_spark_job',
application='/path/to/spark_app.py',
conn_id='spark_default',
conf={
'spark.executor.memory': '4g',
'spark.executor.cores': '2',
'spark.driver.memory': '2g',
},
dag=dag,
)
# Execute SQL query with Spark
spark_sql = SparkSqlOperator(
task_id='run_spark_sql',
sql='SELECT COUNT(*) FROM users WHERE active = true',
conn_id='spark_sql_default',
dag=dag,
)
# PySpark task decorator example
@pyspark_task(task_id='process_data')
def process_user_data(spark):
df = spark.read.parquet('/data/users.parquet')
result = df.filter(df.active == True).groupBy('region').count()
result.write.mode('overwrite').parquet('/data/user_counts.parquet')
return result.count()
process_task = process_user_data()
# Set task dependencies
spark_job >> spark_sql >> process_taskThe Apache Spark provider follows Airflow's standard provider pattern with distinct layers:
This design enables flexible integration with various Spark deployment architectures including local mode, YARN clusters, Kubernetes, Standalone clusters, and cloud-managed Spark services through consistent Airflow abstractions.
Execute Spark applications using spark-submit binary with comprehensive configuration support. Handles Spark application submission, monitoring, and resource management across different cluster managers.
class SparkSubmitOperator(BaseOperator):
def __init__(
self,
application: str = None,
conf: dict = None,
conn_id: str = 'spark_default',
files: str = None,
py_files: str = None,
archives: str = None,
driver_class_path: str = None,
jars: str = None,
java_class: str = None,
packages: str = None,
exclude_packages: str = None,
repositories: str = None,
total_executor_cores: int = None,
executor_cores: int = None,
executor_memory: str = None,
driver_memory: str = None,
keytab: str = None,
principal: str = None,
proxy_user: str = None,
name: str = None,
num_executors: int = None,
application_args: list = None,
env_vars: dict = None,
verbose: bool = False,
spark_binary: str = 'spark-submit',
properties_file: str = None,
**kwargs
): ...
def execute(self, context): ...
def on_kill(self): ...Manage connections to various Spark interfaces including traditional spark-submit, Spark SQL, JDBC operations, and modern Spark Connect protocol. Provides connection configuration, authentication, and cluster communication.
class SparkSubmitHook(BaseHook):
conn_name_attr = "conn_id"
default_conn_name = "spark_default"
conn_type = "spark"
hook_name = "Spark"
def submit(self, application: str, **kwargs) -> None: ...
def on_kill(self) -> None: ...
def get_conn(self): ...
class SparkConnectHook(BaseHook):
conn_name_attr = "conn_id"
default_conn_name = "spark_connect_default"
conn_type = "spark_connect"
hook_name = "Spark Connect"
def get_connection_url(self) -> str: ...Create PySpark tasks that automatically receive Spark session objects, enabling seamless integration of PySpark code within Airflow workflows with automatic session management and cleanup.
def pyspark_task(
python_callable: Callable | None = None,
multiple_outputs: bool | None = None,
**kwargs,
) -> TaskDecorator: ...
# Usage example:
@pyspark_task
def my_spark_function(spark):
"""Function receives SparkSession as 'spark' parameter"""
df = spark.createDataFrame([(1, 'a'), (2, 'b')], ['id', 'value'])
return df.count()The provider registers these connection types in Airflow:
spark - For SparkSubmitHook connections to Spark clustersspark_sql - For SparkSqlHook connections to execute SQL queriesspark_jdbc - For SparkJDBCHook connections for database transfersspark_connect - For SparkConnectHook connections using Spark Connect protocolEach connection type provides custom UI fields for configuration including cluster URLs, authentication credentials, SSL settings, and deployment-specific parameters.
Common exceptions that may be raised:
AirflowException - General Spark job execution failures, configuration errorsAirflowNotFoundException - Missing Spark applications, connection configurationsHandle these exceptions in your DAG error handling and retry logic as appropriate for your use case.