Provider package for Apache Spark integration with Apache Airflow, offering operators, hooks, sensors, and decorators for distributed data processing workflows.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Operators for executing various types of Spark jobs within Airflow workflows. These operators provide task-level components that handle Spark application submission, SQL query execution, and JDBC database operations with comprehensive configuration and monitoring capabilities.
Execute Spark applications using the spark-submit binary with full support for cluster managers, resource configuration, authentication, and dependency management.
class SparkSubmitOperator(BaseOperator):
"""
Execute Spark applications via spark-submit binary.
Parameters:
- application (str): Path to Spark application file (.py, .jar, .scala, .R) (default: "")
- conf (dict[Any, Any] | None): Spark configuration properties as key-value pairs
- conn_id (str): Airflow connection ID for Spark cluster (default: "spark_default")
- files (str | None): Comma-separated list of files to place in working directory
- py_files (str | None): Comma-separated list of .zip, .egg, .py files for Python path
- archives (str | None): Comma-separated list of archives to extract in working directory
- driver_class_path (str | None): Extra classpath entries for driver
- jars (str | None): Comma-separated list of JAR files to include
- java_class (str | None): Main class for Java/Scala applications
- packages (str | None): Maven coordinates of packages to include
- exclude_packages (str | None): Maven coordinates of packages to exclude
- repositories (str | None): Additional remote repositories for dependency resolution
- total_executor_cores (int | None): Total cores for all executors
- executor_cores (int | None): Number of cores per executor
- executor_memory (str | None): Memory per executor (e.g., '4g', '2048m')
- driver_memory (str | None): Memory for driver (e.g., '2g', '1024m')
- keytab (str | None): Path to Kerberos keytab file
- principal (str | None): Kerberos principal
- proxy_user (str | None): User to impersonate when running job
- name (str): Name for Spark application (default: "arrow-spark")
- num_executors (int | None): Number of executors to launch
- application_args (list[Any] | None): Arguments passed to main method of application
- env_vars (dict[str, Any] | None): Environment variables for Spark application
- verbose (bool): Enable verbose output (default: False)
- spark_binary (str | None): Spark binary to use (uses connection setting if not specified)
- properties_file (str | None): Path to properties file with Spark configuration
- yarn_queue (str | None): YARN queue to submit to
- deploy_mode (str | None): Deploy mode (client or cluster)
- status_poll_interval (int): Seconds between polls of driver status (default: 1)
- use_krb5ccache (bool): Use Kerberos credential cache (default: False)
- openlineage_inject_parent_job_info (bool): Inject OpenLineage parent job info (default: False)
- openlineage_inject_transport_info (bool): Inject OpenLineage transport info (default: False)
Template Fields: application, conf, files, py_files, jars, driver_class_path,
packages, exclude_packages, keytab, principal, proxy_user, name,
application_args, env_vars, properties_file
"""
template_fields = (
"application", "conf", "files", "py_files", "jars",
"driver_class_path", "packages", "exclude_packages",
"keytab", "principal", "proxy_user", "name",
"application_args", "env_vars", "properties_file"
)
def __init__(
self,
*,
application: str = "",
conf: dict[Any, Any] | None = None,
conn_id: str = "spark_default",
files: str | None = None,
py_files: str | None = None,
archives: str | None = None,
driver_class_path: str | None = None,
jars: str | None = None,
java_class: str | None = None,
packages: str | None = None,
exclude_packages: str | None = None,
repositories: str | None = None,
total_executor_cores: int | None = None,
executor_cores: int | None = None,
executor_memory: str | None = None,
driver_memory: str | None = None,
keytab: str | None = None,
principal: str | None = None,
proxy_user: str | None = None,
name: str = "arrow-spark",
num_executors: int | None = None,
status_poll_interval: int = 1,
application_args: list[Any] | None = None,
env_vars: dict[str, Any] | None = None,
verbose: bool = False,
spark_binary: str | None = None,
properties_file: str | None = None,
yarn_queue: str | None = None,
deploy_mode: str | None = None,
use_krb5ccache: bool = False,
openlineage_inject_parent_job_info: bool = False,
openlineage_inject_transport_info: bool = False,
**kwargs
): ...
def execute(self, context) -> None:
"""Execute Spark application using SparkSubmitHook."""
def on_kill(self) -> None:
"""Kill running Spark job."""from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
# Submit PySpark application
pyspark_job = SparkSubmitOperator(
task_id='process_data',
application='/path/to/data_processing.py',
conn_id='spark_cluster',
conf={
'spark.executor.memory': '4g',
'spark.executor.cores': '2',
'spark.driver.memory': '2g',
'spark.sql.adaptive.enabled': 'true',
'spark.sql.adaptive.coalescePartitions.enabled': 'true',
},
py_files='s3://bucket/dependencies.zip',
application_args=['--input', 's3://bucket/input/', '--output', 's3://bucket/output/'],
env_vars={'SPARK_ENV': 'production'},
dag=dag,
)
# Submit Scala/Java application
scala_job = SparkSubmitOperator(
task_id='run_scala_job',
application='/path/to/app.jar',
java_class='com.example.SparkApplication',
jars='/path/to/additional.jar',
packages='org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0',
conf={
'spark.executor.instances': '10',
'spark.executor.memory': '8g',
},
dag=dag,
)Execute Spark SQL queries using the spark-sql binary with support for Hive integration, query templating, and various output formats.
class SparkSqlOperator(BaseOperator):
"""
Execute Spark SQL queries via spark-sql binary.
Parameters:
- sql (str): SQL query to execute (can be templated)
- conf (dict[str, Any] | str | None): Spark configuration properties
- conn_id (str): Airflow connection ID (default: "spark_sql_default")
- total_executor_cores (int | None): Total cores for all executors
- executor_cores (int | None): Number of cores per executor
- executor_memory (str | None): Memory per executor
- keytab (str | None): Path to Kerberos keytab file
- principal (str | None): Kerberos principal
- master (str | None): Cluster manager URL (default: None, uses connection)
- name (str): Name for Spark application (default: "default-name")
- num_executors (int | None): Number of executors
- verbose (bool): Enable verbose output (default: True)
- yarn_queue (str | None): YARN queue name
Template Fields: sql
Template Extensions: .sql, .hql
"""
template_fields = ("sql",)
template_ext = (".sql", ".hql")
template_fields_renderers = {"sql": "sql"}
def __init__(
self,
*,
sql: str,
conf: dict[str, Any] | str | None = None,
conn_id: str = "spark_sql_default",
total_executor_cores: int | None = None,
executor_cores: int | None = None,
executor_memory: str | None = None,
keytab: str | None = None,
principal: str | None = None,
master: str | None = None,
name: str = "default-name",
num_executors: int | None = None,
verbose: bool = True,
yarn_queue: str | None = None,
**kwargs
): ...
def execute(self, context) -> None:
"""Execute SQL query using SparkSqlHook."""
def on_kill(self) -> None:
"""Kill running SQL query."""from airflow.providers.apache.spark.operators.spark_sql import SparkSqlOperator
# Execute SQL query from string
sql_analysis = SparkSqlOperator(
task_id='analyze_user_data',
sql="""
CREATE TABLE user_summary AS
SELECT region,
COUNT(*) as user_count,
AVG(age) as avg_age,
SUM(total_purchases) as total_revenue
FROM users
WHERE active = true
AND registration_date >= '{{ ds }}'
GROUP BY region
""",
conn_id='spark_sql_cluster',
conf={
'spark.sql.adaptive.enabled': 'true',
'spark.sql.adaptive.coalescePartitions.enabled': 'true',
},
dag=dag,
)
# Execute SQL from file
sql_from_file = SparkSqlOperator(
task_id='run_etl_script',
sql='etl_queries.sql', # File in DAG folder or templated path
conn_id='spark_sql_default',
dag=dag,
)Transfer data between Spark and JDBC databases with support for batch processing, partitioning, and various save modes.
class SparkJDBCOperator(SparkSubmitOperator):
"""
Execute Spark JDBC operations to transfer data between Spark and databases.
Inherits from SparkSubmitOperator for Spark configuration.
Parameters:
- spark_app_name (str): Name for Spark application (default: 'airflow-spark-jdbc')
- spark_conn_id (str): Spark connection ID (default: 'spark_default')
- spark_conf (dict): Spark configuration properties
- spark_py_files (str): Python files for Spark
- spark_files (str): Files for Spark
- spark_jars (str): JAR files for Spark (include JDBC drivers)
- num_executors (int): Number of Spark executors
- executor_cores (int): Cores per executor
- executor_memory (str): Memory per executor
- driver_memory (str): Driver memory
- verbose (bool): Enable verbose output
- principal (str): Kerberos principal
- keytab (str): Kerberos keytab path
- cmd_type (str): Operation type ('spark_to_jdbc' or 'jdbc_to_spark')
- jdbc_table (str): JDBC table name
- jdbc_conn_id (str): JDBC connection ID (default: 'jdbc_default')
- jdbc_driver (str): JDBC driver class name
- metastore_table (str): Spark metastore table name
- jdbc_truncate (bool): Truncate JDBC table before write (default: False)
- save_mode (str): Save mode ('append', 'overwrite', 'ignore', 'error')
- save_format (str): Save format ('json', 'parquet', 'csv', etc.)
- batch_size (int): JDBC batch size for writes
- fetch_size (int): JDBC fetch size for reads
- num_partitions (int): Number of partitions for parallel reads/writes
- partition_column (str): Column for partitioning JDBC reads
- lower_bound (str): Lower bound for partition column
- upper_bound (str): Upper bound for partition column
- create_table_column_types (str): Column types for table creation
- use_krb5ccache (bool): Use Kerberos credential cache
"""
def __init__(
self,
spark_app_name: str = 'airflow-spark-jdbc',
spark_conn_id: str = 'spark_default',
spark_conf: dict[str, Any] | None = None,
spark_py_files: str = None,
spark_files: str = None,
spark_jars: str = None,
num_executors: int = None,
executor_cores: int = None,
executor_memory: str = None,
driver_memory: str = None,
verbose: bool = False,
principal: str = None,
keytab: str = None,
cmd_type: str = 'spark_to_jdbc',
jdbc_table: str = None,
jdbc_conn_id: str = 'jdbc_default',
jdbc_driver: str = None,
metastore_table: str = None,
jdbc_truncate: bool = False,
save_mode: str = None,
save_format: str = None,
batch_size: int = None,
fetch_size: int = None,
num_partitions: int = None,
partition_column: str = None,
lower_bound: str = None,
upper_bound: str = None,
create_table_column_types: str = None,
use_krb5ccache: bool = False,
**kwargs
): ...
def execute(self, context) -> None:
"""Execute JDBC transfer using SparkJDBCHook."""
def on_kill(self) -> None:
"""Kill running JDBC job."""from airflow.providers.apache.spark.operators.spark_jdbc import SparkJDBCOperator
# Transfer data from Spark to database
spark_to_db = SparkJDBCOperator(
task_id='export_results_to_db',
spark_conn_id='spark_cluster',
jdbc_conn_id='postgres_default',
cmd_type='spark_to_jdbc',
metastore_table='processed_data',
jdbc_table='analytics_results',
jdbc_driver='org.postgresql.Driver',
spark_jars='postgresql-42.2.18.jar',
save_mode='overwrite',
jdbc_truncate=True,
batch_size=10000,
dag=dag,
)
# Transfer data from database to Spark with partitioning
db_to_spark = SparkJDBCOperator(
task_id='load_data_from_db',
spark_conn_id='spark_cluster',
jdbc_conn_id='mysql_warehouse',
cmd_type='jdbc_to_spark',
jdbc_table='large_table',
metastore_table='imported_data',
jdbc_driver='com.mysql.cj.jdbc.Driver',
spark_jars='mysql-connector-java-8.0.23.jar',
num_partitions=10,
partition_column='id',
lower_bound='1',
upper_bound='1000000',
fetch_size=50000,
dag=dag,
)Common exceptions and error scenarios:
Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-apache-spark