Provider package for Apache Airflow that enables comprehensive OpenLineage data lineage tracking and observability for data pipelines.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Utilities for integrating with Spark applications, including automatic injection of OpenLineage configuration into Spark properties for comprehensive lineage tracking across Spark jobs executed from Airflow.
Function for injecting OpenLineage parent job information into Spark application properties.
def inject_parent_job_information_into_spark_properties(properties: dict, context: Context) -> dict:
"""
Inject OpenLineage parent job information into Spark application properties.
Automatically injects parent job details (namespace, job name, run ID) from the
current Airflow task context into Spark application properties. This enables
Spark applications to emit lineage events that are properly linked to their
parent Airflow jobs.
Args:
properties: Existing Spark application properties dictionary
context: Airflow task context containing lineage information
Returns:
dict: Updated properties dictionary with injected parent job information
Injected Properties:
- spark.openlineage.parentJobNamespace: OpenLineage namespace
- spark.openlineage.parentJobName: Parent job name
- spark.openlineage.parentRunId: Parent run identifier
"""Function for injecting OpenLineage transport configuration into Spark application properties.
def inject_transport_information_into_spark_properties(properties: dict, context: Context) -> dict:
"""
Inject OpenLineage transport information into Spark application properties.
Automatically injects transport configuration from Airflow's OpenLineage
settings into Spark application properties. This allows Spark applications
to emit lineage events to the same backend as Airflow without requiring
separate configuration.
Args:
properties: Existing Spark application properties dictionary
context: Airflow task context (used for configuration access)
Returns:
dict: Updated properties dictionary with injected transport configuration
Injected Properties:
- spark.openlineage.transport.type: Transport type (http, kafka, etc.)
- spark.openlineage.transport.url: Transport URL (for HTTP transport)
- spark.openlineage.transport.endpoint: API endpoint (for HTTP transport)
- spark.openlineage.transport.*: Additional transport-specific properties
- spark.openlineage.namespace: OpenLineage namespace
"""from airflow.providers.openlineage.utils.spark import (
inject_parent_job_information_into_spark_properties,
inject_transport_information_into_spark_properties
)
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
def create_spark_task_with_lineage(**context):
"""Create Spark task with automatic OpenLineage integration."""
# Base Spark configuration
spark_properties = {
'spark.app.name': 'data-processing-job',
'spark.executor.memory': '4g',
'spark.executor.cores': '2',
'spark.sql.adaptive.enabled': 'true'
}
# Inject parent job information
spark_properties = inject_parent_job_information_into_spark_properties(
properties=spark_properties,
context=context
)
# Inject transport information
spark_properties = inject_transport_information_into_spark_properties(
properties=spark_properties,
context=context
)
print("Spark properties with OpenLineage integration:")
for key, value in spark_properties.items():
if 'openlineage' in key.lower():
print(f" {key}: {value}")
return spark_properties
# Use in Spark operator
spark_task = SparkSubmitOperator(
task_id='process_data_with_lineage',
application='/path/to/spark_job.py',
conf=create_spark_task_with_lineage,
dag=dag
)from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from airflow.providers.openlineage.utils.spark import (
inject_parent_job_information_into_spark_properties,
inject_transport_information_into_spark_properties
)
def spark_job_with_openlineage(**context):
"""Configure Spark job with OpenLineage integration."""
# Start with basic configuration
base_conf = {
'spark.app.name': f"airflow-job-{context['task_instance'].task_id}",
'spark.sql.adaptive.enabled': 'true',
'spark.sql.adaptive.coalescePartitions.enabled': 'true',
'spark.executor.memory': '8g',
'spark.executor.cores': '4'
}
# Add OpenLineage parent job information
conf_with_parent = inject_parent_job_information_into_spark_properties(base_conf, context)
# Add OpenLineage transport configuration
final_conf = inject_transport_information_into_spark_properties(conf_with_parent, context)
return final_conf
# Create Spark task
spark_analytics = SparkSubmitOperator(
task_id='spark_analytics_job',
application='/opt/spark/jobs/analytics.py',
application_args=['--input', '/data/raw/', '--output', '/data/processed/'],
conf=spark_job_with_openlineage,
dag=dag
)from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from airflow.providers.openlineage.utils.spark import *
def create_pyspark_config(**context):
"""Create PySpark configuration with OpenLineage integration."""
# PySpark-specific configuration
pyspark_conf = {
'spark.app.name': 'pyspark-data-pipeline',
'spark.submit.deployMode': 'cluster',
'spark.executor.instances': '10',
'spark.executor.memory': '6g',
'spark.executor.cores': '3',
'spark.driver.memory': '2g',
'spark.driver.cores': '1',
# Python configuration
'spark.pyspark.python': '/opt/python3.9/bin/python',
'spark.pyspark.driver.python': '/opt/python3.9/bin/python',
# Serialization
'spark.serializer': 'org.apache.spark.serializer.KryoSerializer',
# Dynamic allocation
'spark.dynamicAllocation.enabled': 'true',
'spark.dynamicAllocation.minExecutors': '2',
'spark.dynamicAllocation.maxExecutors': '20'
}
# Inject OpenLineage configuration
pyspark_conf = inject_parent_job_information_into_spark_properties(pyspark_conf, context)
pyspark_conf = inject_transport_information_into_spark_properties(pyspark_conf, context)
return pyspark_conf
pyspark_task = SparkSubmitOperator(
task_id='pyspark_etl',
application='/opt/spark/jobs/etl_pipeline.py',
py_files=['/opt/spark/libs/common_functions.py'],
files=['/opt/spark/config/database.conf'],
conf=create_pyspark_config,
dag=dag
)from airflow.providers.openlineage.conf import spark_inject_parent_job_info, spark_inject_transport_info
from airflow.providers.openlineage.utils.spark import *
def conditional_spark_config(**context):
"""Conditionally inject OpenLineage configuration based on settings."""
base_conf = {
'spark.app.name': 'conditional-lineage-job',
'spark.executor.memory': '4g',
'spark.executor.cores': '2'
}
# Check configuration flags
inject_parent = spark_inject_parent_job_info()
inject_transport = spark_inject_transport_info()
print(f"Parent job injection enabled: {inject_parent}")
print(f"Transport injection enabled: {inject_transport}")
if inject_parent:
base_conf = inject_parent_job_information_into_spark_properties(base_conf, context)
print("Injected parent job information")
if inject_transport:
base_conf = inject_transport_information_into_spark_properties(base_conf, context)
print("Injected transport information")
return base_conf
conditional_spark_task = SparkSubmitOperator(
task_id='conditional_spark_job',
application='/opt/spark/jobs/conditional_job.py',
conf=conditional_spark_config,
dag=dag
)from airflow.providers.amazon.aws.operators.emr import EmrAddStepsOperator
from airflow.providers.openlineage.utils.spark import *
def create_emr_spark_step(**context):
"""Create EMR Spark step with OpenLineage integration."""
# Base Spark configuration for EMR
spark_conf = {
'spark.app.name': 'emr-openlineage-job',
'spark.executor.memory': '8g',
'spark.executor.cores': '4',
'spark.driver.memory': '2g',
'spark.sql.adaptive.enabled': 'true'
}
# Add OpenLineage configuration
spark_conf = inject_parent_job_information_into_spark_properties(spark_conf, context)
spark_conf = inject_transport_information_into_spark_properties(spark_conf, context)
# Convert to EMR spark-submit format
spark_conf_args = []
for key, value in spark_conf.items():
spark_conf_args.extend(['--conf', f'{key}={value}'])
# EMR step configuration
step = {
'Name': 'Spark Job with OpenLineage',
'ActionOnFailure': 'TERMINATE_CLUSTER',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': [
'spark-submit',
'--deploy-mode', 'cluster',
'--class', 'org.apache.spark.examples.SparkPi'
] + spark_conf_args + [
's3://my-bucket/spark-jobs/data-processing.py',
'--input', 's3://my-bucket/input/',
'--output', 's3://my-bucket/output/'
]
}
}
return [step]
emr_spark_task = EmrAddStepsOperator(
task_id='emr_spark_with_lineage',
job_flow_id='{{ ti.xcom_pull(task_ids="create_emr_cluster", key="return_value") }}',
steps=create_emr_spark_step,
dag=dag
)from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
from airflow.providers.openlineage.utils.spark import *
def create_databricks_config(**context):
"""Create Databricks configuration with OpenLineage integration."""
# Base Databricks Spark configuration
spark_conf = {
'spark.app.name': 'databricks-openlineage-job',
'spark.sql.adaptive.enabled': 'true',
'spark.sql.adaptive.coalescePartitions.enabled': 'true'
}
# Add OpenLineage configuration
spark_conf = inject_parent_job_information_into_spark_properties(spark_conf, context)
spark_conf = inject_transport_information_into_spark_properties(spark_conf, context)
# Databricks job configuration
databricks_config = {
'spark_python_task': {
'python_file': 'dbfs:/mnt/jobs/data_processing.py',
'parameters': ['--input', 'dbfs:/mnt/data/input', '--output', 'dbfs:/mnt/data/output']
},
'new_cluster': {
'spark_version': '11.3.x-scala2.12',
'node_type_id': 'i3.xlarge',
'num_workers': 4,
'spark_conf': spark_conf
}
}
return databricks_config
databricks_task = DatabricksSubmitRunOperator(
task_id='databricks_with_lineage',
json=create_databricks_config,
dag=dag
)from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
from airflow.providers.openlineage.utils.spark import *
def create_k8s_spark_config(**context):
"""Create Kubernetes Spark configuration with OpenLineage."""
# Base Kubernetes Spark configuration
spark_conf = {
'spark.app.name': 'k8s-openlineage-job',
'spark.kubernetes.container.image': 'my-registry/spark:3.4.0',
'spark.kubernetes.driver.pod.name': f"spark-driver-{context['task_instance'].task_id}",
'spark.executor.instances': '3',
'spark.executor.memory': '4g',
'spark.executor.cores': '2',
'spark.driver.memory': '2g',
'spark.driver.cores': '1'
}
# Add OpenLineage configuration
spark_conf = inject_parent_job_information_into_spark_properties(spark_conf, context)
spark_conf = inject_transport_information_into_spark_properties(spark_conf, context)
return spark_conf
k8s_spark_task = SparkKubernetesOperator(
task_id='k8s_spark_with_lineage',
application_file='/opt/spark/jobs/k8s_job.py',
kubernetes_conn_id='kubernetes_default',
conf=create_k8s_spark_config,
dag=dag
)from airflow.providers.openlineage.utils.spark import *
from airflow.models import Variable
def create_templated_spark_config(**context):
"""Create templated Spark configuration with environment-specific settings."""
# Get environment from Airflow Variable
environment = Variable.get('environment', default_var='development')
# Environment-specific base configuration
base_configs = {
'development': {
'spark.executor.memory': '2g',
'spark.executor.cores': '1',
'spark.executor.instances': '2'
},
'staging': {
'spark.executor.memory': '4g',
'spark.executor.cores': '2',
'spark.executor.instances': '5'
},
'production': {
'spark.executor.memory': '8g',
'spark.executor.cores': '4',
'spark.executor.instances': '10'
}
}
# Start with environment-specific config
spark_conf = base_configs.get(environment, base_configs['development']).copy()
# Add common configuration
spark_conf.update({
'spark.app.name': f"{environment}-{context['task_instance'].task_id}",
'spark.sql.adaptive.enabled': 'true',
'spark.serializer': 'org.apache.spark.serializer.KryoSerializer'
})
# Add OpenLineage configuration
spark_conf = inject_parent_job_information_into_spark_properties(spark_conf, context)
spark_conf = inject_transport_information_into_spark_properties(spark_conf, context)
# Add environment-specific OpenLineage namespace override
if 'spark.openlineage.namespace' in spark_conf:
spark_conf['spark.openlineage.namespace'] = f"{environment}_{spark_conf['spark.openlineage.namespace']}"
return spark_conf
templated_spark_task = SparkSubmitOperator(
task_id='templated_spark_job',
application='/opt/spark/jobs/templated_job.py',
conf=create_templated_spark_config,
dag=dag
)# airflow.cfg
[openlineage]
spark_inject_parent_job_info = true
spark_inject_transport_info = true
transport = {"type": "http", "url": "http://marquez:5000"}
namespace = production_airflowWhen the injection functions are used, the following properties are automatically added to Spark configurations:
# Parent job information
{
'spark.openlineage.parentJobNamespace': 'production_airflow',
'spark.openlineage.parentJobName': 'my_dag.my_task',
'spark.openlineage.parentRunId': 'my_dag.my_task.2023-12-01T00:00:00+00:00.1'
}
# Transport information
{
'spark.openlineage.transport.type': 'http',
'spark.openlineage.transport.url': 'http://marquez:5000',
'spark.openlineage.transport.endpoint': '/api/v1/lineage',
'spark.openlineage.namespace': 'production_airflow'
}Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-openlineage