CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-apache-spark

Provider package for Apache Spark integration with Apache Airflow, offering operators, hooks, sensors, and decorators for distributed data processing workflows.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

spark-hooks.mddocs/

Spark Hooks

Connection management hooks for various Spark interfaces including spark-submit, Spark SQL, JDBC operations, and Spark Connect protocol. These hooks handle authentication, connection configuration, and communication with different Spark deployment modes.

Capabilities

Spark Submit Connection Management

Manage connections for spark-submit operations across different cluster managers with support for comprehensive Spark configuration and resource management.

class SparkSubmitHook(BaseHook):
    """
    Hook for spark-submit binary execution with extensive configuration support.
    
    Connection Information:
    - conn_name_attr: "conn_id"
    - default_conn_name: "spark_default"
    - conn_type: "spark"
    - hook_name: "Spark"
    
    Parameters:
    - conf (dict): Spark configuration properties
    - conn_id (str): Connection ID (default: 'spark_default')
    - files (str): Files to place in working directory
    - py_files (str): Python files for Python path
    - archives (str): Archives to extract
    - driver_class_path (str): Extra classpath for driver
    - jars (str): JAR files to include
    - java_class (str): Main class for Java/Scala apps
    - packages (str): Maven packages to include
    - exclude_packages (str): Maven packages to exclude
    - repositories (str): Additional repositories
    - total_executor_cores (int): Total cores for executors
    - executor_cores (int): Cores per executor
    - executor_memory (str): Memory per executor
    - driver_memory (str): Driver memory
    - keytab (str): Kerberos keytab path
    - principal (str): Kerberos principal
    - proxy_user (str): User to impersonate
    - name (str): Application name
    - num_executors (int): Number of executors
    - status_poll_interval (int): Poll interval in seconds (default: 1)
    - application_args (list): Application arguments
    - env_vars (dict): Environment variables
    - verbose (bool): Verbose output (default: False)
    - spark_binary (str): Spark binary (default: 'spark-submit')
    - properties_file (str): Properties file path
    - yarn_queue (str): YARN queue name  
    - deploy_mode (str): Deploy mode (client or cluster)
    - use_krb5ccache (bool): Use Kerberos credential cache (default: False)
    """
    
    conn_name_attr = "conn_id"
    default_conn_name = "spark_default"
    conn_type = "spark"
    hook_name = "Spark"
    
    def __init__(
        self,
        conf: dict = None,
        conn_id: str = 'spark_default',
        files: str = None,
        py_files: str = None,
        archives: str = None,
        driver_class_path: str = None,
        jars: str = None,
        java_class: str = None,
        packages: str = None,
        exclude_packages: str = None,
        repositories: str = None,
        total_executor_cores: int = None,
        executor_cores: int = None,
        executor_memory: str = None,
        driver_memory: str = None,
        keytab: str = None,
        principal: str = None,
        proxy_user: str = None,
        name: str = 'default-name',
        num_executors: int = None,
        status_poll_interval: int = 1,
        application_args: list = None,
        env_vars: dict = None,
        verbose: bool = False,
        spark_binary: str = 'spark-submit',
        properties_file: str = None,
        yarn_queue: str = None,
        deploy_mode: str = None,
        *,
        use_krb5ccache: bool = False
    ): ...
    
    def submit(self, application: str, **kwargs) -> None:
        """
        Submit Spark application for execution.
        
        Parameters:
        - application (str): Path to Spark application file
        - **kwargs: Additional arguments override hook defaults
        """
        
    def on_kill(self) -> None:
        """Kill the running Spark job."""
        
    def get_conn(self):
        """Get connection (no-op for Spark submit)."""
        
    @staticmethod
    def get_ui_field_behaviour() -> dict:
        """Return UI field configuration for connection form."""
        
    @staticmethod
    def get_connection_form_widgets() -> dict:
        """Return connection form widgets configuration."""

Usage Example

from airflow.providers.apache.spark.hooks.spark_submit import SparkSubmitHook

# Create hook with configuration
hook = SparkSubmitHook(
    conn_id='spark_cluster',
    conf={
        'spark.executor.memory': '4g',
        'spark.executor.cores': '2',
        'spark.driver.memory': '2g',
    },
    packages='org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0',
    verbose=True
)

# Submit application
hook.submit(
    application='/path/to/app.py',
    application_args=['--input', 'hdfs://data/input', '--output', 'hdfs://data/output']
)

Spark SQL Connection Management

Manage connections for Spark SQL operations with support for Hive integration and distributed SQL execution.

class SparkSqlHook(BaseHook):
    """
    Hook for spark-sql binary execution with SQL query support.
    
    Connection Information:
    - conn_name_attr: "conn_id"  
    - default_conn_name: "spark_sql_default"
    - conn_type: "spark_sql"
    - hook_name: "Spark SQL"
    
    Parameters:
    - sql (str): SQL query to execute
    - conf (dict): Spark configuration properties
    - conn_id (str): Connection ID (default: 'spark_sql_default')
    - total_executor_cores (int): Total cores for executors
    - executor_cores (int): Cores per executor
    - executor_memory (str): Memory per executor
    - keytab (str): Kerberos keytab path
    - principal (str): Kerberos principal
    - master (str): Cluster manager URL (default: 'yarn')
    - name (str): Application name (default: 'default')
    - num_executors (int): Number of executors
    - verbose (bool): Verbose output (default: True)
    - yarn_queue (str): YARN queue (default: 'default')
    - properties_file (str): Properties file path
    - application_args (list): Additional arguments
    """
    
    conn_name_attr = "conn_id"
    default_conn_name = "spark_sql_default"
    conn_type = "spark_sql"
    hook_name = "Spark SQL"
    
    def __init__(
        self,
        sql: str = None,
        conf: dict = None,
        conn_id: str = 'spark_sql_default',
        total_executor_cores: int = None,
        executor_cores: int = None,
        executor_memory: str = None,
        keytab: str = None,
        principal: str = None,
        master: str = 'yarn',
        name: str = 'default',
        num_executors: int = None,
        verbose: bool = True,
        yarn_queue: str = 'default',
        properties_file: str = None,
        application_args: list = None
    ): ...
    
    def run_query(self, cmd: str = None, **kwargs) -> None:
        """
        Execute Spark SQL query.
        
        Parameters:
        - cmd (str): SQL command to execute (overrides sql parameter)
        - **kwargs: Additional execution parameters
        """
        
    def kill(self) -> None:
        """Kill the running SQL query."""
        
    def get_conn(self):
        """Get connection (no-op for Spark SQL)."""
        
    @staticmethod
    def get_ui_field_behaviour() -> dict:
        """Return UI field configuration for connection form."""
        
    @staticmethod
    def get_connection_form_widgets() -> dict:
        """Return connection form widgets configuration."""

Usage Example

from airflow.providers.apache.spark.hooks.spark_sql import SparkSqlHook

# Create hook for SQL execution
hook = SparkSqlHook(
    conn_id='spark_sql_cluster',
    conf={
        'spark.sql.adaptive.enabled': 'true',
        'spark.sql.adaptive.coalescePartitions.enabled': 'true',
    },
    master='yarn',
    yarn_queue='analytics'
)

# Execute SQL query
hook.run_query("""
    CREATE TABLE daily_summary AS
    SELECT date, region, COUNT(*) as transactions
    FROM sales 
    WHERE date = current_date()
    GROUP BY date, region
""")

Spark JDBC Connection Management

Manage connections for Spark JDBC operations with database integration support including authentication and connection pooling.

class SparkJDBCHook(SparkSubmitHook):
    """
    Hook for Spark JDBC operations, extends SparkSubmitHook.
    
    Connection Information:
    - conn_name_attr: "spark_conn_id"
    - default_conn_name: "spark_default"
    - conn_type: "spark_jdbc"
    - hook_name: "Spark JDBC"
    
    Parameters:
    - spark_app_name (str): Spark application name (default: 'airflow-spark-jdbc')
    - spark_conn_id (str): Spark connection ID (default: 'spark_default')
    - spark_conf (dict): Spark configuration
    - spark_py_files (str): Python files
    - spark_files (str): Additional files
    - spark_jars (str): JAR files (include JDBC drivers)
    - num_executors (int): Number of executors
    - executor_cores (int): Cores per executor
    - executor_memory (str): Memory per executor
    - driver_memory (str): Driver memory
    - verbose (bool): Verbose output
    - keytab (str): Kerberos keytab
    - principal (str): Kerberos principal
    - cmd_type (str): Operation type ('spark_to_jdbc', 'jdbc_to_spark')
    - jdbc_table (str): JDBC table name
    - jdbc_conn_id (str): JDBC connection ID (default: 'jdbc_default')
    - jdbc_driver (str): JDBC driver class
    - metastore_table (str): Spark metastore table
    - jdbc_truncate (bool): Truncate table before write
    - save_mode (str): Save mode for writes
    - save_format (str): Data format
    - batch_size (int): JDBC batch size
    - fetch_size (int): JDBC fetch size
    - num_partitions (int): Number of partitions
    - partition_column (str): Partitioning column
    - lower_bound (str): Partition lower bound
    - upper_bound (str): Partition upper bound
    - create_table_column_types (str): Column types for table creation
    - use_krb5ccache (bool): Use Kerberos credential cache
    """
    
    conn_name_attr = "spark_conn_id"
    default_conn_name = "spark_default"
    conn_type = "spark_jdbc"
    hook_name = "Spark JDBC"
    
    def __init__(
        self,
        spark_app_name: str = 'airflow-spark-jdbc',
        spark_conn_id: str = 'spark_default',
        spark_conf: dict = None,
        spark_py_files: str = None,
        spark_files: str = None,
        spark_jars: str = None,
        num_executors: int = None,
        executor_cores: int = None,
        executor_memory: str = None,
        driver_memory: str = None,
        verbose: bool = False,
        keytab: str = None,
        principal: str = None,
        cmd_type: str = 'spark_to_jdbc',
        jdbc_table: str = None,
        jdbc_conn_id: str = 'jdbc_default',
        jdbc_driver: str = None,
        metastore_table: str = None,
        jdbc_truncate: bool = False,
        save_mode: str = None,
        save_format: str = None,
        batch_size: int = None,
        fetch_size: int = None,
        num_partitions: int = None,
        partition_column: str = None,
        lower_bound: str = None,
        upper_bound: str = None,
        create_table_column_types: str = None,
        use_krb5ccache: bool = False
    ): ...
    
    def submit_jdbc_job(self) -> None:
        """Submit Spark JDBC transfer job."""
        
    def get_conn(self):
        """Get connection (no-op for Spark JDBC)."""

Usage Example

from airflow.providers.apache.spark.hooks.spark_jdbc import SparkJDBCHook

# Create hook for database transfer
hook = SparkJDBCHook(
    spark_conn_id='spark_cluster',
    jdbc_conn_id='postgres_warehouse',
    cmd_type='jdbc_to_spark',
    jdbc_table='customer_data',
    metastore_table='customers',
    jdbc_driver='org.postgresql.Driver',
    spark_jars='postgresql-42.2.18.jar',
    num_partitions=8,
    partition_column='customer_id',
    lower_bound='1',
    upper_bound='1000000'
)

# Execute transfer
hook.submit_jdbc_job()

Spark Connect Connection Management

Manage connections using the modern Spark Connect protocol for improved performance and scalability.

class SparkConnectHook(BaseHook):
    """
    Hook for Spark Connect protocol connections.
    
    Connection Information:
    - conn_name_attr: "conn_id"
    - default_conn_name: "spark_connect_default"
    - conn_type: "spark_connect" 
    - hook_name: "Spark Connect"
    
    Constants:
    - PARAM_USE_SSL: "use_ssl"
    - PARAM_TOKEN: "token"
    - PARAM_USER_ID: "user_id"
    
    Parameters:
    - conn_id (str): Connection ID (default: 'spark_connect_default')
    """
    
    conn_name_attr = "conn_id"
    default_conn_name = "spark_connect_default"
    conn_type = "spark_connect"
    hook_name = "Spark Connect"
    
    PARAM_USE_SSL = "use_ssl"
    PARAM_TOKEN = "token"
    PARAM_USER_ID = "user_id"
    
    def __init__(self, conn_id: str = 'spark_connect_default'): ...
    
    def get_connection_url(self) -> str:
        """
        Build Spark Connect connection URL.
        
        Returns:
        str: Complete Spark Connect URL for client connections
        """
        
    @staticmethod
    def get_ui_field_behaviour() -> dict:
        """Return UI field configuration for connection form."""
        
    @staticmethod
    def get_connection_form_widgets() -> dict:
        """Return connection form widgets configuration."""

Usage Example

from airflow.providers.apache.spark.hooks.spark_connect import SparkConnectHook

# Create Spark Connect hook
hook = SparkConnectHook(conn_id='spark_connect_cluster')

# Get connection URL for Spark Connect client
connect_url = hook.get_connection_url()
# Returns: sc://hostname:15002/;token=abc123;user_id=airflow

# Use with Spark Connect client
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .remote(connect_url) \
    .appName("Airflow Spark Connect") \
    .getOrCreate()

Connection Configuration

Spark Connection (spark)

  • Host: Cluster manager URL (e.g., yarn, spark://master:7077, k8s://api-server)
  • Extra: JSON with additional Spark configuration
  • Login/Password: For cluster authentication if required

Spark SQL Connection (spark_sql)

  • Host: Cluster manager URL
  • Extra: JSON with Spark SQL specific configuration
  • Schema: Default database/schema

Spark JDBC Connection (spark_jdbc)

  • Inherits: Spark connection configuration
  • Extra: JDBC-specific settings and driver configuration

Spark Connect Connection (spark_connect)

  • Host: Spark Connect server hostname
  • Port: Spark Connect server port (default: 15002)
  • Extra: JSON with SSL settings, tokens, and user authentication

Error Handling and Best Practices

Connection Management

  • Connection pooling: Configure appropriate pool sizes for JDBC operations
  • Authentication: Ensure proper Kerberos or token-based authentication setup
  • SSL/TLS: Use secure connections for production environments
  • Timeouts: Configure appropriate connection and query timeouts

Resource Management

  • Memory allocation: Balance driver and executor memory based on workload
  • Core allocation: Optimize core allocation for cluster utilization
  • Dynamic allocation: Use Spark's dynamic allocation for variable workloads

Error Recovery

  • Retry logic: Implement retry mechanisms for transient failures
  • Graceful degradation: Handle cluster unavailability scenarios
  • Resource monitoring: Monitor cluster resources and job progress

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-apache-spark

docs

index.md

pyspark-decorators.md

spark-hooks.md

spark-operators.md

tile.json