CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-openlineage

Provider package for Apache Airflow that enables comprehensive OpenLineage data lineage tracking and observability for data pipelines.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

spark-integration.mddocs/

Spark Integration

Utilities for integrating with Spark applications, including automatic injection of OpenLineage configuration into Spark properties for comprehensive lineage tracking across Spark jobs executed from Airflow.

Capabilities

Parent Job Information Injection

Function for injecting OpenLineage parent job information into Spark application properties.

def inject_parent_job_information_into_spark_properties(properties: dict, context: Context) -> dict:
    """
    Inject OpenLineage parent job information into Spark application properties.
    
    Automatically injects parent job details (namespace, job name, run ID) from the
    current Airflow task context into Spark application properties. This enables
    Spark applications to emit lineage events that are properly linked to their
    parent Airflow jobs.
    
    Args:
        properties: Existing Spark application properties dictionary
        context: Airflow task context containing lineage information
        
    Returns:
        dict: Updated properties dictionary with injected parent job information
        
    Injected Properties:
        - spark.openlineage.parentJobNamespace: OpenLineage namespace
        - spark.openlineage.parentJobName: Parent job name
        - spark.openlineage.parentRunId: Parent run identifier
    """

Transport Information Injection

Function for injecting OpenLineage transport configuration into Spark application properties.

def inject_transport_information_into_spark_properties(properties: dict, context: Context) -> dict:
    """
    Inject OpenLineage transport information into Spark application properties.
    
    Automatically injects transport configuration from Airflow's OpenLineage
    settings into Spark application properties. This allows Spark applications
    to emit lineage events to the same backend as Airflow without requiring
    separate configuration.
    
    Args:
        properties: Existing Spark application properties dictionary
        context: Airflow task context (used for configuration access)
        
    Returns:
        dict: Updated properties dictionary with injected transport configuration
        
    Injected Properties:
        - spark.openlineage.transport.type: Transport type (http, kafka, etc.)
        - spark.openlineage.transport.url: Transport URL (for HTTP transport)
        - spark.openlineage.transport.endpoint: API endpoint (for HTTP transport)
        - spark.openlineage.transport.*: Additional transport-specific properties
        - spark.openlineage.namespace: OpenLineage namespace
    """

Usage Examples

Basic Spark Integration

from airflow.providers.openlineage.utils.spark import (
    inject_parent_job_information_into_spark_properties,
    inject_transport_information_into_spark_properties
)
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator

def create_spark_task_with_lineage(**context):
    """Create Spark task with automatic OpenLineage integration."""
    
    # Base Spark configuration
    spark_properties = {
        'spark.app.name': 'data-processing-job',
        'spark.executor.memory': '4g',
        'spark.executor.cores': '2',
        'spark.sql.adaptive.enabled': 'true'
    }
    
    # Inject parent job information
    spark_properties = inject_parent_job_information_into_spark_properties(
        properties=spark_properties,
        context=context
    )
    
    # Inject transport information
    spark_properties = inject_transport_information_into_spark_properties(
        properties=spark_properties,
        context=context
    )
    
    print("Spark properties with OpenLineage integration:")
    for key, value in spark_properties.items():
        if 'openlineage' in key.lower():
            print(f"  {key}: {value}")
    
    return spark_properties

# Use in Spark operator
spark_task = SparkSubmitOperator(
    task_id='process_data_with_lineage',
    application='/path/to/spark_job.py',
    conf=create_spark_task_with_lineage,
    dag=dag
)

Spark Submit with Automatic Lineage

from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from airflow.providers.openlineage.utils.spark import (
    inject_parent_job_information_into_spark_properties,
    inject_transport_information_into_spark_properties
)

def spark_job_with_openlineage(**context):
    """Configure Spark job with OpenLineage integration."""
    
    # Start with basic configuration
    base_conf = {
        'spark.app.name': f"airflow-job-{context['task_instance'].task_id}",
        'spark.sql.adaptive.enabled': 'true',
        'spark.sql.adaptive.coalescePartitions.enabled': 'true',
        'spark.executor.memory': '8g',
        'spark.executor.cores': '4'
    }
    
    # Add OpenLineage parent job information
    conf_with_parent = inject_parent_job_information_into_spark_properties(base_conf, context)
    
    # Add OpenLineage transport configuration
    final_conf = inject_transport_information_into_spark_properties(conf_with_parent, context)
    
    return final_conf

# Create Spark task
spark_analytics = SparkSubmitOperator(
    task_id='spark_analytics_job',
    application='/opt/spark/jobs/analytics.py',
    application_args=['--input', '/data/raw/', '--output', '/data/processed/'],
    conf=spark_job_with_openlineage,
    dag=dag
)

PySpark with Custom Configuration

from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from airflow.providers.openlineage.utils.spark import *

def create_pyspark_config(**context):
    """Create PySpark configuration with OpenLineage integration."""
    
    # PySpark-specific configuration
    pyspark_conf = {
        'spark.app.name': 'pyspark-data-pipeline',
        'spark.submit.deployMode': 'cluster',
        'spark.executor.instances': '10',
        'spark.executor.memory': '6g',
        'spark.executor.cores': '3',
        'spark.driver.memory': '2g',
        'spark.driver.cores': '1',
        
        # Python configuration
        'spark.pyspark.python': '/opt/python3.9/bin/python',
        'spark.pyspark.driver.python': '/opt/python3.9/bin/python',
        
        # Serialization
        'spark.serializer': 'org.apache.spark.serializer.KryoSerializer',
        
        # Dynamic allocation
        'spark.dynamicAllocation.enabled': 'true',
        'spark.dynamicAllocation.minExecutors': '2',
        'spark.dynamicAllocation.maxExecutors': '20'
    }
    
    # Inject OpenLineage configuration
    pyspark_conf = inject_parent_job_information_into_spark_properties(pyspark_conf, context)
    pyspark_conf = inject_transport_information_into_spark_properties(pyspark_conf, context)
    
    return pyspark_conf

pyspark_task = SparkSubmitOperator(
    task_id='pyspark_etl',
    application='/opt/spark/jobs/etl_pipeline.py',
    py_files=['/opt/spark/libs/common_functions.py'],
    files=['/opt/spark/config/database.conf'],
    conf=create_pyspark_config,
    dag=dag
)

Conditional Spark Integration

from airflow.providers.openlineage.conf import spark_inject_parent_job_info, spark_inject_transport_info
from airflow.providers.openlineage.utils.spark import *

def conditional_spark_config(**context):
    """Conditionally inject OpenLineage configuration based on settings."""
    
    base_conf = {
        'spark.app.name': 'conditional-lineage-job',
        'spark.executor.memory': '4g',
        'spark.executor.cores': '2'
    }
    
    # Check configuration flags
    inject_parent = spark_inject_parent_job_info()
    inject_transport = spark_inject_transport_info()
    
    print(f"Parent job injection enabled: {inject_parent}")
    print(f"Transport injection enabled: {inject_transport}")
    
    if inject_parent:
        base_conf = inject_parent_job_information_into_spark_properties(base_conf, context)
        print("Injected parent job information")
    
    if inject_transport:
        base_conf = inject_transport_information_into_spark_properties(base_conf, context)
        print("Injected transport information")
    
    return base_conf

conditional_spark_task = SparkSubmitOperator(
    task_id='conditional_spark_job',
    application='/opt/spark/jobs/conditional_job.py',
    conf=conditional_spark_config,
    dag=dag
)

EMR Spark Integration

from airflow.providers.amazon.aws.operators.emr import EmrAddStepsOperator
from airflow.providers.openlineage.utils.spark import *

def create_emr_spark_step(**context):
    """Create EMR Spark step with OpenLineage integration."""
    
    # Base Spark configuration for EMR
    spark_conf = {
        'spark.app.name': 'emr-openlineage-job',
        'spark.executor.memory': '8g',
        'spark.executor.cores': '4',
        'spark.driver.memory': '2g',
        'spark.sql.adaptive.enabled': 'true'
    }
    
    # Add OpenLineage configuration
    spark_conf = inject_parent_job_information_into_spark_properties(spark_conf, context)
    spark_conf = inject_transport_information_into_spark_properties(spark_conf, context)
    
    # Convert to EMR spark-submit format
    spark_conf_args = []
    for key, value in spark_conf.items():
        spark_conf_args.extend(['--conf', f'{key}={value}'])
    
    # EMR step configuration
    step = {
        'Name': 'Spark Job with OpenLineage',
        'ActionOnFailure': 'TERMINATE_CLUSTER',
        'HadoopJarStep': {
            'Jar': 'command-runner.jar',
            'Args': [
                'spark-submit',
                '--deploy-mode', 'cluster',
                '--class', 'org.apache.spark.examples.SparkPi'
            ] + spark_conf_args + [
                's3://my-bucket/spark-jobs/data-processing.py',
                '--input', 's3://my-bucket/input/',
                '--output', 's3://my-bucket/output/'
            ]
        }
    }
    
    return [step]

emr_spark_task = EmrAddStepsOperator(
    task_id='emr_spark_with_lineage',
    job_flow_id='{{ ti.xcom_pull(task_ids="create_emr_cluster", key="return_value") }}',
    steps=create_emr_spark_step,
    dag=dag
)

Databricks Integration

from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
from airflow.providers.openlineage.utils.spark import *

def create_databricks_config(**context):
    """Create Databricks configuration with OpenLineage integration."""
    
    # Base Databricks Spark configuration
    spark_conf = {
        'spark.app.name': 'databricks-openlineage-job',
        'spark.sql.adaptive.enabled': 'true',
        'spark.sql.adaptive.coalescePartitions.enabled': 'true'
    }
    
    # Add OpenLineage configuration
    spark_conf = inject_parent_job_information_into_spark_properties(spark_conf, context)
    spark_conf = inject_transport_information_into_spark_properties(spark_conf, context)
    
    # Databricks job configuration
    databricks_config = {
        'spark_python_task': {
            'python_file': 'dbfs:/mnt/jobs/data_processing.py',
            'parameters': ['--input', 'dbfs:/mnt/data/input', '--output', 'dbfs:/mnt/data/output']
        },
        'new_cluster': {
            'spark_version': '11.3.x-scala2.12',
            'node_type_id': 'i3.xlarge',
            'num_workers': 4,
            'spark_conf': spark_conf
        }
    }
    
    return databricks_config

databricks_task = DatabricksSubmitRunOperator(
    task_id='databricks_with_lineage',
    json=create_databricks_config,
    dag=dag
)

Kubernetes Spark Operator Integration

from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
from airflow.providers.openlineage.utils.spark import *

def create_k8s_spark_config(**context):
    """Create Kubernetes Spark configuration with OpenLineage."""
    
    # Base Kubernetes Spark configuration
    spark_conf = {
        'spark.app.name': 'k8s-openlineage-job',
        'spark.kubernetes.container.image': 'my-registry/spark:3.4.0',
        'spark.kubernetes.driver.pod.name': f"spark-driver-{context['task_instance'].task_id}",
        'spark.executor.instances': '3',
        'spark.executor.memory': '4g',
        'spark.executor.cores': '2',
        'spark.driver.memory': '2g',
        'spark.driver.cores': '1'
    }
    
    # Add OpenLineage configuration
    spark_conf = inject_parent_job_information_into_spark_properties(spark_conf, context)
    spark_conf = inject_transport_information_into_spark_properties(spark_conf, context)
    
    return spark_conf

k8s_spark_task = SparkKubernetesOperator(
    task_id='k8s_spark_with_lineage',
    application_file='/opt/spark/jobs/k8s_job.py',
    kubernetes_conn_id='kubernetes_default',
    conf=create_k8s_spark_config,
    dag=dag
)

Custom Spark Configuration Template

from airflow.providers.openlineage.utils.spark import *
from airflow.models import Variable

def create_templated_spark_config(**context):
    """Create templated Spark configuration with environment-specific settings."""
    
    # Get environment from Airflow Variable
    environment = Variable.get('environment', default_var='development')
    
    # Environment-specific base configuration
    base_configs = {
        'development': {
            'spark.executor.memory': '2g',
            'spark.executor.cores': '1',
            'spark.executor.instances': '2'
        },
        'staging': {
            'spark.executor.memory': '4g',
            'spark.executor.cores': '2',
            'spark.executor.instances': '5'
        },
        'production': {
            'spark.executor.memory': '8g',
            'spark.executor.cores': '4',
            'spark.executor.instances': '10'
        }
    }
    
    # Start with environment-specific config
    spark_conf = base_configs.get(environment, base_configs['development']).copy()
    
    # Add common configuration
    spark_conf.update({
        'spark.app.name': f"{environment}-{context['task_instance'].task_id}",
        'spark.sql.adaptive.enabled': 'true',
        'spark.serializer': 'org.apache.spark.serializer.KryoSerializer'
    })
    
    # Add OpenLineage configuration
    spark_conf = inject_parent_job_information_into_spark_properties(spark_conf, context)
    spark_conf = inject_transport_information_into_spark_properties(spark_conf, context)
    
    # Add environment-specific OpenLineage namespace override
    if 'spark.openlineage.namespace' in spark_conf:
        spark_conf['spark.openlineage.namespace'] = f"{environment}_{spark_conf['spark.openlineage.namespace']}"
    
    return spark_conf

templated_spark_task = SparkSubmitOperator(
    task_id='templated_spark_job',
    application='/opt/spark/jobs/templated_job.py',
    conf=create_templated_spark_config,
    dag=dag
)

Configuration Examples

Airflow Configuration for Spark Integration

# airflow.cfg
[openlineage]
spark_inject_parent_job_info = true
spark_inject_transport_info = true
transport = {"type": "http", "url": "http://marquez:5000"}
namespace = production_airflow

Injected Spark Properties

When the injection functions are used, the following properties are automatically added to Spark configurations:

# Parent job information
{
    'spark.openlineage.parentJobNamespace': 'production_airflow',
    'spark.openlineage.parentJobName': 'my_dag.my_task',
    'spark.openlineage.parentRunId': 'my_dag.my_task.2023-12-01T00:00:00+00:00.1'
}

# Transport information  
{
    'spark.openlineage.transport.type': 'http',
    'spark.openlineage.transport.url': 'http://marquez:5000',
    'spark.openlineage.transport.endpoint': '/api/v1/lineage',
    'spark.openlineage.namespace': 'production_airflow'
}

Integration Benefits

  1. Automatic Lineage Linking: Spark jobs automatically link to their parent Airflow tasks
  2. Unified Configuration: Single OpenLineage configuration shared between Airflow and Spark
  3. Complete Data Flow Tracking: End-to-end lineage from Airflow through Spark transformations
  4. Simplified Setup: No need for separate Spark OpenLineage configuration
  5. Environment Consistency: Same lineage backend for both orchestration and processing layers

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-openlineage

docs

configuration.md

facets-metadata.md

index.md

lineage-extraction.md

plugin-integration.md

selective-control.md

spark-integration.md

sql-parsing.md

sql-utilities.md

template-macros.md

utility-functions.md

tile.json