CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-apache-spark

Provider package for Apache Spark integration with Apache Airflow, offering operators, hooks, sensors, and decorators for distributed data processing workflows.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

spark-operators.mddocs/

Spark Operators

Operators for executing various types of Spark jobs within Airflow workflows. These operators provide task-level components that handle Spark application submission, SQL query execution, and JDBC database operations with comprehensive configuration and monitoring capabilities.

Capabilities

Spark Application Submission

Execute Spark applications using the spark-submit binary with full support for cluster managers, resource configuration, authentication, and dependency management.

class SparkSubmitOperator(BaseOperator):
    """
    Execute Spark applications via spark-submit binary.
    
    Parameters:
    - application (str): Path to Spark application file (.py, .jar, .scala, .R) (default: "")
    - conf (dict[Any, Any] | None): Spark configuration properties as key-value pairs
    - conn_id (str): Airflow connection ID for Spark cluster (default: "spark_default")
    - files (str | None): Comma-separated list of files to place in working directory
    - py_files (str | None): Comma-separated list of .zip, .egg, .py files for Python path
    - archives (str | None): Comma-separated list of archives to extract in working directory
    - driver_class_path (str | None): Extra classpath entries for driver
    - jars (str | None): Comma-separated list of JAR files to include
    - java_class (str | None): Main class for Java/Scala applications
    - packages (str | None): Maven coordinates of packages to include
    - exclude_packages (str | None): Maven coordinates of packages to exclude
    - repositories (str | None): Additional remote repositories for dependency resolution
    - total_executor_cores (int | None): Total cores for all executors
    - executor_cores (int | None): Number of cores per executor
    - executor_memory (str | None): Memory per executor (e.g., '4g', '2048m')
    - driver_memory (str | None): Memory for driver (e.g., '2g', '1024m')
    - keytab (str | None): Path to Kerberos keytab file
    - principal (str | None): Kerberos principal
    - proxy_user (str | None): User to impersonate when running job
    - name (str): Name for Spark application (default: "arrow-spark")
    - num_executors (int | None): Number of executors to launch
    - application_args (list[Any] | None): Arguments passed to main method of application
    - env_vars (dict[str, Any] | None): Environment variables for Spark application
    - verbose (bool): Enable verbose output (default: False)
    - spark_binary (str | None): Spark binary to use (uses connection setting if not specified)
    - properties_file (str | None): Path to properties file with Spark configuration
    - yarn_queue (str | None): YARN queue to submit to
    - deploy_mode (str | None): Deploy mode (client or cluster)
    - status_poll_interval (int): Seconds between polls of driver status (default: 1)
    - use_krb5ccache (bool): Use Kerberos credential cache (default: False)
    - openlineage_inject_parent_job_info (bool): Inject OpenLineage parent job info (default: False)
    - openlineage_inject_transport_info (bool): Inject OpenLineage transport info (default: False)
    
    Template Fields: application, conf, files, py_files, jars, driver_class_path,
                    packages, exclude_packages, keytab, principal, proxy_user, name,
                    application_args, env_vars, properties_file
    """
    
    template_fields = (
        "application", "conf", "files", "py_files", "jars",
        "driver_class_path", "packages", "exclude_packages", 
        "keytab", "principal", "proxy_user", "name",
        "application_args", "env_vars", "properties_file"
    )
    
    def __init__(
        self,
        *,
        application: str = "",
        conf: dict[Any, Any] | None = None,
        conn_id: str = "spark_default",
        files: str | None = None,
        py_files: str | None = None,
        archives: str | None = None,
        driver_class_path: str | None = None,
        jars: str | None = None,
        java_class: str | None = None,
        packages: str | None = None,
        exclude_packages: str | None = None,
        repositories: str | None = None,
        total_executor_cores: int | None = None,
        executor_cores: int | None = None,
        executor_memory: str | None = None,
        driver_memory: str | None = None,
        keytab: str | None = None,
        principal: str | None = None,
        proxy_user: str | None = None,
        name: str = "arrow-spark",
        num_executors: int | None = None,
        status_poll_interval: int = 1,
        application_args: list[Any] | None = None,
        env_vars: dict[str, Any] | None = None,
        verbose: bool = False,
        spark_binary: str | None = None,
        properties_file: str | None = None,
        yarn_queue: str | None = None,
        deploy_mode: str | None = None,
        use_krb5ccache: bool = False,
        openlineage_inject_parent_job_info: bool = False,
        openlineage_inject_transport_info: bool = False,
        **kwargs
    ): ...
    
    def execute(self, context) -> None:
        """Execute Spark application using SparkSubmitHook."""
        
    def on_kill(self) -> None:
        """Kill running Spark job."""

Usage Example

from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator

# Submit PySpark application
pyspark_job = SparkSubmitOperator(
    task_id='process_data',
    application='/path/to/data_processing.py',
    conn_id='spark_cluster',
    conf={
        'spark.executor.memory': '4g',
        'spark.executor.cores': '2',
        'spark.driver.memory': '2g',
        'spark.sql.adaptive.enabled': 'true',
        'spark.sql.adaptive.coalescePartitions.enabled': 'true',
    },
    py_files='s3://bucket/dependencies.zip',
    application_args=['--input', 's3://bucket/input/', '--output', 's3://bucket/output/'],
    env_vars={'SPARK_ENV': 'production'},
    dag=dag,
)

# Submit Scala/Java application
scala_job = SparkSubmitOperator(
    task_id='run_scala_job',
    application='/path/to/app.jar',
    java_class='com.example.SparkApplication',
    jars='/path/to/additional.jar',
    packages='org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0',
    conf={
        'spark.executor.instances': '10',
        'spark.executor.memory': '8g',
    },
    dag=dag,
)

Spark SQL Execution

Execute Spark SQL queries using the spark-sql binary with support for Hive integration, query templating, and various output formats.

class SparkSqlOperator(BaseOperator):
    """
    Execute Spark SQL queries via spark-sql binary.
    
    Parameters:
    - sql (str): SQL query to execute (can be templated)
    - conf (dict[str, Any] | str | None): Spark configuration properties
    - conn_id (str): Airflow connection ID (default: "spark_sql_default")
    - total_executor_cores (int | None): Total cores for all executors
    - executor_cores (int | None): Number of cores per executor
    - executor_memory (str | None): Memory per executor
    - keytab (str | None): Path to Kerberos keytab file
    - principal (str | None): Kerberos principal
    - master (str | None): Cluster manager URL (default: None, uses connection)
    - name (str): Name for Spark application (default: "default-name")
    - num_executors (int | None): Number of executors
    - verbose (bool): Enable verbose output (default: True)
    - yarn_queue (str | None): YARN queue name
    
    Template Fields: sql
    Template Extensions: .sql, .hql
    """
    
    template_fields = ("sql",)
    template_ext = (".sql", ".hql")
    template_fields_renderers = {"sql": "sql"}
    
    def __init__(
        self,
        *,
        sql: str,
        conf: dict[str, Any] | str | None = None,
        conn_id: str = "spark_sql_default",
        total_executor_cores: int | None = None,
        executor_cores: int | None = None,
        executor_memory: str | None = None,
        keytab: str | None = None,
        principal: str | None = None,
        master: str | None = None,
        name: str = "default-name",
        num_executors: int | None = None,
        verbose: bool = True,
        yarn_queue: str | None = None,
        **kwargs
    ): ...
    
    def execute(self, context) -> None:
        """Execute SQL query using SparkSqlHook."""
        
    def on_kill(self) -> None:
        """Kill running SQL query."""

Usage Example

from airflow.providers.apache.spark.operators.spark_sql import SparkSqlOperator

# Execute SQL query from string
sql_analysis = SparkSqlOperator(
    task_id='analyze_user_data',
    sql="""
        CREATE TABLE user_summary AS
        SELECT region, 
               COUNT(*) as user_count,
               AVG(age) as avg_age,
               SUM(total_purchases) as total_revenue
        FROM users 
        WHERE active = true 
          AND registration_date >= '{{ ds }}'
        GROUP BY region
    """,
    conn_id='spark_sql_cluster',
    conf={
        'spark.sql.adaptive.enabled': 'true',
        'spark.sql.adaptive.coalescePartitions.enabled': 'true',
    },
    dag=dag,
)

# Execute SQL from file
sql_from_file = SparkSqlOperator(
    task_id='run_etl_script',
    sql='etl_queries.sql',  # File in DAG folder or templated path
    conn_id='spark_sql_default',
    dag=dag,
)

Spark JDBC Operations

Transfer data between Spark and JDBC databases with support for batch processing, partitioning, and various save modes.

class SparkJDBCOperator(SparkSubmitOperator):
    """
    Execute Spark JDBC operations to transfer data between Spark and databases.
    Inherits from SparkSubmitOperator for Spark configuration.
    
    Parameters:
    - spark_app_name (str): Name for Spark application (default: 'airflow-spark-jdbc')
    - spark_conn_id (str): Spark connection ID (default: 'spark_default')
    - spark_conf (dict): Spark configuration properties
    - spark_py_files (str): Python files for Spark
    - spark_files (str): Files for Spark
    - spark_jars (str): JAR files for Spark (include JDBC drivers)
    - num_executors (int): Number of Spark executors
    - executor_cores (int): Cores per executor
    - executor_memory (str): Memory per executor
    - driver_memory (str): Driver memory
    - verbose (bool): Enable verbose output
    - principal (str): Kerberos principal
    - keytab (str): Kerberos keytab path
    - cmd_type (str): Operation type ('spark_to_jdbc' or 'jdbc_to_spark')
    - jdbc_table (str): JDBC table name
    - jdbc_conn_id (str): JDBC connection ID (default: 'jdbc_default')
    - jdbc_driver (str): JDBC driver class name
    - metastore_table (str): Spark metastore table name
    - jdbc_truncate (bool): Truncate JDBC table before write (default: False)
    - save_mode (str): Save mode ('append', 'overwrite', 'ignore', 'error')
    - save_format (str): Save format ('json', 'parquet', 'csv', etc.)
    - batch_size (int): JDBC batch size for writes
    - fetch_size (int): JDBC fetch size for reads
    - num_partitions (int): Number of partitions for parallel reads/writes
    - partition_column (str): Column for partitioning JDBC reads
    - lower_bound (str): Lower bound for partition column
    - upper_bound (str): Upper bound for partition column
    - create_table_column_types (str): Column types for table creation
    - use_krb5ccache (bool): Use Kerberos credential cache
    """
    
    def __init__(
        self,
        spark_app_name: str = 'airflow-spark-jdbc',
        spark_conn_id: str = 'spark_default',
        spark_conf: dict[str, Any] | None = None,
        spark_py_files: str = None,
        spark_files: str = None,
        spark_jars: str = None,
        num_executors: int = None,
        executor_cores: int = None,
        executor_memory: str = None,
        driver_memory: str = None,
        verbose: bool = False,
        principal: str = None,
        keytab: str = None,
        cmd_type: str = 'spark_to_jdbc',
        jdbc_table: str = None,
        jdbc_conn_id: str = 'jdbc_default',
        jdbc_driver: str = None,
        metastore_table: str = None,
        jdbc_truncate: bool = False,
        save_mode: str = None,
        save_format: str = None,
        batch_size: int = None,
        fetch_size: int = None,
        num_partitions: int = None,
        partition_column: str = None,
        lower_bound: str = None,
        upper_bound: str = None,
        create_table_column_types: str = None,
        use_krb5ccache: bool = False,
        **kwargs
    ): ...
    
    def execute(self, context) -> None:
        """Execute JDBC transfer using SparkJDBCHook."""
        
    def on_kill(self) -> None:
        """Kill running JDBC job."""

Usage Example

from airflow.providers.apache.spark.operators.spark_jdbc import SparkJDBCOperator

# Transfer data from Spark to database
spark_to_db = SparkJDBCOperator(
    task_id='export_results_to_db',
    spark_conn_id='spark_cluster',
    jdbc_conn_id='postgres_default',
    cmd_type='spark_to_jdbc',
    metastore_table='processed_data',
    jdbc_table='analytics_results',
    jdbc_driver='org.postgresql.Driver',
    spark_jars='postgresql-42.2.18.jar',
    save_mode='overwrite',
    jdbc_truncate=True,
    batch_size=10000,
    dag=dag,
)

# Transfer data from database to Spark with partitioning
db_to_spark = SparkJDBCOperator(
    task_id='load_data_from_db',
    spark_conn_id='spark_cluster',
    jdbc_conn_id='mysql_warehouse',
    cmd_type='jdbc_to_spark',
    jdbc_table='large_table',
    metastore_table='imported_data',
    jdbc_driver='com.mysql.cj.jdbc.Driver',
    spark_jars='mysql-connector-java-8.0.23.jar',
    num_partitions=10,
    partition_column='id',
    lower_bound='1',
    upper_bound='1000000',
    fetch_size=50000,
    dag=dag,
)

Error Handling

Common exceptions and error scenarios:

Application Errors

  • Application not found: Verify application path is accessible to Spark cluster
  • Class not found: Ensure main class exists and dependencies are included
  • Resource allocation failures: Check cluster capacity and resource requirements

Configuration Errors

  • Invalid Spark configuration: Validate conf parameters against Spark documentation
  • Connection failures: Verify connection configurations and cluster accessibility
  • Authentication errors: Check Kerberos settings, keytab files, and principals

JDBC Errors

  • Driver not found: Include JDBC driver JAR in spark_jars parameter
  • Connection failures: Verify JDBC connection settings and database accessibility
  • Table/column errors: Ensure target tables exist and column types are compatible

Best Practices

  • Use connection pooling for JDBC operations
  • Configure appropriate batch sizes for data transfers
  • Monitor cluster resources and adjust executor settings
  • Use partitioning for large dataset transfers
  • Implement proper error handling and retry logic in DAGs

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-apache-spark

docs

index.md

pyspark-decorators.md

spark-hooks.md

spark-operators.md

tile.json