Common Compatibility Provider - providing compatibility code for previous Airflow versions
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
OpenLineage facets, utilities, and compatibility functions for data lineage tracking, including SQL and Spark integration utilities. This module provides version-compatible OpenLineage integration with graceful fallbacks when OpenLineage dependencies are not available.
Core OpenLineage classes for data lineage representation.
class BaseFacet:
"""Base class for OpenLineage facets."""
class Dataset:
"""OpenLineage dataset representation."""
class DatasetFacet:
"""Base class for dataset-specific facets."""
class InputDataset:
"""Input dataset in OpenLineage events."""
class OutputDataset:
"""Output dataset in OpenLineage events."""
class RunFacet:
"""Base class for run-specific facets."""Specific facet classes for detailed lineage tracking.
class ColumnLineageDatasetFacet:
"""Facet for tracking column-level lineage."""
class DocumentationDatasetFacet:
"""Facet for dataset documentation."""
class SchemaDatasetFacet:
"""Facet for dataset schema information."""
class LifecycleStateChangeDatasetFacet:
"""Facet for dataset lifecycle state changes."""
class OutputStatisticsOutputDatasetFacet:
"""Facet for output dataset statistics."""
class SymlinksDatasetFacet:
"""Facet for dataset symlink information."""Facets for tracking job execution and run information.
class ErrorMessageRunFacet:
"""Facet for capturing error messages in runs."""
class ExternalQueryRunFacet:
"""Facet for external query execution information."""
class ExtractionErrorRunFacet:
"""Facet for lineage extraction errors."""
class SQLJobFacet:
"""Facet for SQL job information."""Helper classes for OpenLineage data structures.
class Fields:
"""Field definitions for schema facets."""
class InputField:
"""Input field definition for column lineage."""
class Error:
"""Error representation in OpenLineage events."""
class LifecycleStateChange:
"""Lifecycle state change representation."""
class PreviousIdentifier:
"""Previous identifier for renamed datasets."""
class Identifier:
"""Dataset identifier representation."""
class SchemaDatasetFacetFields:
"""Schema field definitions."""Function that creates no-op implementations when OpenLineage is not available.
def create_no_op(*_, **__) -> None:
"""
Create a no-op placeholder when OpenLineage client is not available.
Returns:
None: Always returns None as a no-op implementation
"""Utilities for extracting OpenLineage facets from SQL operations.
def get_openlineage_facets_with_sql(
hook,
sql: str | list[str],
conn_id: str,
database: str | None
):
"""
Get OpenLineage facets from SQL queries.
Args:
hook: Database hook instance
sql (str | list[str]): SQL query or queries to analyze
conn_id (str): Connection ID for the database
database (str | None): Database name
Returns:
OpenLineage facets extracted from the SQL operation
"""Utilities for injecting OpenLineage information into Spark properties.
def inject_parent_job_information_into_spark_properties(
properties: dict,
context: Context
) -> dict:
"""
Inject OpenLineage parent job information into Spark properties.
Args:
properties (dict): Spark configuration properties
context (Context): Airflow task context
Returns:
dict: Updated properties with OpenLineage parent job information
"""
def inject_transport_information_into_spark_properties(
properties: dict,
context: Context
) -> dict:
"""
Inject OpenLineage transport information into Spark properties.
Args:
properties (dict): Spark configuration properties
context (Context): Airflow task context
Returns:
dict: Updated properties with OpenLineage transport configuration
"""Utility for translating Airflow assets for OpenLineage compatibility.
def translate_airflow_asset(*args, **kwargs):
"""
Translate Airflow assets for OpenLineage compatibility.
Handles asset/dataset compatibility between Airflow versions.
Maps to translate_airflow_asset in newer versions or translate_airflow_dataset
with parameter renaming in older versions.
Note: This function is conditionally imported and may have different signatures
depending on the OpenLineage provider version and Airflow version.
"""from airflow.providers.common.compat.openlineage.facet import (
Dataset, RunFacet, SchemaDatasetFacet, SQLJobFacet, create_no_op
)
from airflow.providers.common.compat.openlineage.utils.sql import get_openlineage_facets_with_sql
from airflow.providers.common.compat.openlineage.utils.spark import (
inject_parent_job_information_into_spark_properties,
inject_transport_information_into_spark_properties
)
from airflow.providers.common.compat.openlineage.check import require_openlineage_version
# Check if OpenLineage is available and use facets
try:
# Create OpenLineage dataset
input_dataset = Dataset(
namespace="postgresql://localhost:5432",
name="analytics.users"
)
# Create schema facet
schema_facet = SchemaDatasetFacet(
fields=[
{"name": "user_id", "type": "INTEGER"},
{"name": "email", "type": "VARCHAR(255)"}
]
)
# Create SQL job facet
sql_facet = SQLJobFacet(query="SELECT * FROM users WHERE active = true")
except ImportError:
# Fall back to no-op when OpenLineage not available
input_dataset = create_no_op()
schema_facet = create_no_op()
sql_facet = create_no_op()
# SQL integration example
@require_openlineage_version(provider_min_version="1.0.0")
def extract_sql_lineage(**context):
from airflow.providers.postgres.hooks.postgres import PostgresHook
hook = PostgresHook(conn_id="postgres_default")
sql = "SELECT user_id, email FROM users WHERE created_date = '{{ ds }}'"
# Get OpenLineage facets from SQL
facets = get_openlineage_facets_with_sql(
hook=hook,
sql=sql,
conn_id="postgres_default",
database="analytics"
)
return facets
# Spark integration example
@require_openlineage_version(provider_min_version="1.2.0")
def configure_spark_with_openlineage(**context):
spark_properties = {
"spark.app.name": "data-processing",
"spark.sql.adaptive.enabled": "true"
}
# Inject OpenLineage parent job information
spark_properties = inject_parent_job_information_into_spark_properties(
properties=spark_properties,
context=context
)
# Inject OpenLineage transport configuration
spark_properties = inject_transport_information_into_spark_properties(
properties=spark_properties,
context=context
)
return spark_properties
# Use in operators
from airflow.providers.postgres.operators.postgres import PostgresOperator
sql_task = PostgresOperator(
task_id="process_users",
sql="""
INSERT INTO processed_users
SELECT user_id, UPPER(email) as email
FROM users
WHERE created_date = '{{ ds }}'
""",
postgres_conn_id="postgres_default",
# OpenLineage will automatically extract lineage information
)Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-common-compat