Provider package for Apache Spark integration with Apache Airflow, offering operators, hooks, sensors, and decorators for distributed data processing workflows.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Connection management hooks for various Spark interfaces including spark-submit, Spark SQL, JDBC operations, and Spark Connect protocol. These hooks handle authentication, connection configuration, and communication with different Spark deployment modes.
Manage connections for spark-submit operations across different cluster managers with support for comprehensive Spark configuration and resource management.
class SparkSubmitHook(BaseHook):
"""
Hook for spark-submit binary execution with extensive configuration support.
Connection Information:
- conn_name_attr: "conn_id"
- default_conn_name: "spark_default"
- conn_type: "spark"
- hook_name: "Spark"
Parameters:
- conf (dict): Spark configuration properties
- conn_id (str): Connection ID (default: 'spark_default')
- files (str): Files to place in working directory
- py_files (str): Python files for Python path
- archives (str): Archives to extract
- driver_class_path (str): Extra classpath for driver
- jars (str): JAR files to include
- java_class (str): Main class for Java/Scala apps
- packages (str): Maven packages to include
- exclude_packages (str): Maven packages to exclude
- repositories (str): Additional repositories
- total_executor_cores (int): Total cores for executors
- executor_cores (int): Cores per executor
- executor_memory (str): Memory per executor
- driver_memory (str): Driver memory
- keytab (str): Kerberos keytab path
- principal (str): Kerberos principal
- proxy_user (str): User to impersonate
- name (str): Application name
- num_executors (int): Number of executors
- status_poll_interval (int): Poll interval in seconds (default: 1)
- application_args (list): Application arguments
- env_vars (dict): Environment variables
- verbose (bool): Verbose output (default: False)
- spark_binary (str): Spark binary (default: 'spark-submit')
- properties_file (str): Properties file path
- yarn_queue (str): YARN queue name
- deploy_mode (str): Deploy mode (client or cluster)
- use_krb5ccache (bool): Use Kerberos credential cache (default: False)
"""
conn_name_attr = "conn_id"
default_conn_name = "spark_default"
conn_type = "spark"
hook_name = "Spark"
def __init__(
self,
conf: dict = None,
conn_id: str = 'spark_default',
files: str = None,
py_files: str = None,
archives: str = None,
driver_class_path: str = None,
jars: str = None,
java_class: str = None,
packages: str = None,
exclude_packages: str = None,
repositories: str = None,
total_executor_cores: int = None,
executor_cores: int = None,
executor_memory: str = None,
driver_memory: str = None,
keytab: str = None,
principal: str = None,
proxy_user: str = None,
name: str = 'default-name',
num_executors: int = None,
status_poll_interval: int = 1,
application_args: list = None,
env_vars: dict = None,
verbose: bool = False,
spark_binary: str = 'spark-submit',
properties_file: str = None,
yarn_queue: str = None,
deploy_mode: str = None,
*,
use_krb5ccache: bool = False
): ...
def submit(self, application: str, **kwargs) -> None:
"""
Submit Spark application for execution.
Parameters:
- application (str): Path to Spark application file
- **kwargs: Additional arguments override hook defaults
"""
def on_kill(self) -> None:
"""Kill the running Spark job."""
def get_conn(self):
"""Get connection (no-op for Spark submit)."""
@staticmethod
def get_ui_field_behaviour() -> dict:
"""Return UI field configuration for connection form."""
@staticmethod
def get_connection_form_widgets() -> dict:
"""Return connection form widgets configuration."""from airflow.providers.apache.spark.hooks.spark_submit import SparkSubmitHook
# Create hook with configuration
hook = SparkSubmitHook(
conn_id='spark_cluster',
conf={
'spark.executor.memory': '4g',
'spark.executor.cores': '2',
'spark.driver.memory': '2g',
},
packages='org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0',
verbose=True
)
# Submit application
hook.submit(
application='/path/to/app.py',
application_args=['--input', 'hdfs://data/input', '--output', 'hdfs://data/output']
)Manage connections for Spark SQL operations with support for Hive integration and distributed SQL execution.
class SparkSqlHook(BaseHook):
"""
Hook for spark-sql binary execution with SQL query support.
Connection Information:
- conn_name_attr: "conn_id"
- default_conn_name: "spark_sql_default"
- conn_type: "spark_sql"
- hook_name: "Spark SQL"
Parameters:
- sql (str): SQL query to execute
- conf (dict): Spark configuration properties
- conn_id (str): Connection ID (default: 'spark_sql_default')
- total_executor_cores (int): Total cores for executors
- executor_cores (int): Cores per executor
- executor_memory (str): Memory per executor
- keytab (str): Kerberos keytab path
- principal (str): Kerberos principal
- master (str): Cluster manager URL (default: 'yarn')
- name (str): Application name (default: 'default')
- num_executors (int): Number of executors
- verbose (bool): Verbose output (default: True)
- yarn_queue (str): YARN queue (default: 'default')
- properties_file (str): Properties file path
- application_args (list): Additional arguments
"""
conn_name_attr = "conn_id"
default_conn_name = "spark_sql_default"
conn_type = "spark_sql"
hook_name = "Spark SQL"
def __init__(
self,
sql: str = None,
conf: dict = None,
conn_id: str = 'spark_sql_default',
total_executor_cores: int = None,
executor_cores: int = None,
executor_memory: str = None,
keytab: str = None,
principal: str = None,
master: str = 'yarn',
name: str = 'default',
num_executors: int = None,
verbose: bool = True,
yarn_queue: str = 'default',
properties_file: str = None,
application_args: list = None
): ...
def run_query(self, cmd: str = None, **kwargs) -> None:
"""
Execute Spark SQL query.
Parameters:
- cmd (str): SQL command to execute (overrides sql parameter)
- **kwargs: Additional execution parameters
"""
def kill(self) -> None:
"""Kill the running SQL query."""
def get_conn(self):
"""Get connection (no-op for Spark SQL)."""
@staticmethod
def get_ui_field_behaviour() -> dict:
"""Return UI field configuration for connection form."""
@staticmethod
def get_connection_form_widgets() -> dict:
"""Return connection form widgets configuration."""from airflow.providers.apache.spark.hooks.spark_sql import SparkSqlHook
# Create hook for SQL execution
hook = SparkSqlHook(
conn_id='spark_sql_cluster',
conf={
'spark.sql.adaptive.enabled': 'true',
'spark.sql.adaptive.coalescePartitions.enabled': 'true',
},
master='yarn',
yarn_queue='analytics'
)
# Execute SQL query
hook.run_query("""
CREATE TABLE daily_summary AS
SELECT date, region, COUNT(*) as transactions
FROM sales
WHERE date = current_date()
GROUP BY date, region
""")Manage connections for Spark JDBC operations with database integration support including authentication and connection pooling.
class SparkJDBCHook(SparkSubmitHook):
"""
Hook for Spark JDBC operations, extends SparkSubmitHook.
Connection Information:
- conn_name_attr: "spark_conn_id"
- default_conn_name: "spark_default"
- conn_type: "spark_jdbc"
- hook_name: "Spark JDBC"
Parameters:
- spark_app_name (str): Spark application name (default: 'airflow-spark-jdbc')
- spark_conn_id (str): Spark connection ID (default: 'spark_default')
- spark_conf (dict): Spark configuration
- spark_py_files (str): Python files
- spark_files (str): Additional files
- spark_jars (str): JAR files (include JDBC drivers)
- num_executors (int): Number of executors
- executor_cores (int): Cores per executor
- executor_memory (str): Memory per executor
- driver_memory (str): Driver memory
- verbose (bool): Verbose output
- keytab (str): Kerberos keytab
- principal (str): Kerberos principal
- cmd_type (str): Operation type ('spark_to_jdbc', 'jdbc_to_spark')
- jdbc_table (str): JDBC table name
- jdbc_conn_id (str): JDBC connection ID (default: 'jdbc_default')
- jdbc_driver (str): JDBC driver class
- metastore_table (str): Spark metastore table
- jdbc_truncate (bool): Truncate table before write
- save_mode (str): Save mode for writes
- save_format (str): Data format
- batch_size (int): JDBC batch size
- fetch_size (int): JDBC fetch size
- num_partitions (int): Number of partitions
- partition_column (str): Partitioning column
- lower_bound (str): Partition lower bound
- upper_bound (str): Partition upper bound
- create_table_column_types (str): Column types for table creation
- use_krb5ccache (bool): Use Kerberos credential cache
"""
conn_name_attr = "spark_conn_id"
default_conn_name = "spark_default"
conn_type = "spark_jdbc"
hook_name = "Spark JDBC"
def __init__(
self,
spark_app_name: str = 'airflow-spark-jdbc',
spark_conn_id: str = 'spark_default',
spark_conf: dict = None,
spark_py_files: str = None,
spark_files: str = None,
spark_jars: str = None,
num_executors: int = None,
executor_cores: int = None,
executor_memory: str = None,
driver_memory: str = None,
verbose: bool = False,
keytab: str = None,
principal: str = None,
cmd_type: str = 'spark_to_jdbc',
jdbc_table: str = None,
jdbc_conn_id: str = 'jdbc_default',
jdbc_driver: str = None,
metastore_table: str = None,
jdbc_truncate: bool = False,
save_mode: str = None,
save_format: str = None,
batch_size: int = None,
fetch_size: int = None,
num_partitions: int = None,
partition_column: str = None,
lower_bound: str = None,
upper_bound: str = None,
create_table_column_types: str = None,
use_krb5ccache: bool = False
): ...
def submit_jdbc_job(self) -> None:
"""Submit Spark JDBC transfer job."""
def get_conn(self):
"""Get connection (no-op for Spark JDBC)."""from airflow.providers.apache.spark.hooks.spark_jdbc import SparkJDBCHook
# Create hook for database transfer
hook = SparkJDBCHook(
spark_conn_id='spark_cluster',
jdbc_conn_id='postgres_warehouse',
cmd_type='jdbc_to_spark',
jdbc_table='customer_data',
metastore_table='customers',
jdbc_driver='org.postgresql.Driver',
spark_jars='postgresql-42.2.18.jar',
num_partitions=8,
partition_column='customer_id',
lower_bound='1',
upper_bound='1000000'
)
# Execute transfer
hook.submit_jdbc_job()Manage connections using the modern Spark Connect protocol for improved performance and scalability.
class SparkConnectHook(BaseHook):
"""
Hook for Spark Connect protocol connections.
Connection Information:
- conn_name_attr: "conn_id"
- default_conn_name: "spark_connect_default"
- conn_type: "spark_connect"
- hook_name: "Spark Connect"
Constants:
- PARAM_USE_SSL: "use_ssl"
- PARAM_TOKEN: "token"
- PARAM_USER_ID: "user_id"
Parameters:
- conn_id (str): Connection ID (default: 'spark_connect_default')
"""
conn_name_attr = "conn_id"
default_conn_name = "spark_connect_default"
conn_type = "spark_connect"
hook_name = "Spark Connect"
PARAM_USE_SSL = "use_ssl"
PARAM_TOKEN = "token"
PARAM_USER_ID = "user_id"
def __init__(self, conn_id: str = 'spark_connect_default'): ...
def get_connection_url(self) -> str:
"""
Build Spark Connect connection URL.
Returns:
str: Complete Spark Connect URL for client connections
"""
@staticmethod
def get_ui_field_behaviour() -> dict:
"""Return UI field configuration for connection form."""
@staticmethod
def get_connection_form_widgets() -> dict:
"""Return connection form widgets configuration."""from airflow.providers.apache.spark.hooks.spark_connect import SparkConnectHook
# Create Spark Connect hook
hook = SparkConnectHook(conn_id='spark_connect_cluster')
# Get connection URL for Spark Connect client
connect_url = hook.get_connection_url()
# Returns: sc://hostname:15002/;token=abc123;user_id=airflow
# Use with Spark Connect client
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.remote(connect_url) \
.appName("Airflow Spark Connect") \
.getOrCreate()spark)yarn, spark://master:7077, k8s://api-server)spark_sql)spark_jdbc)spark_connect)Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-apache-spark