CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-openlineage

Provider package for Apache Airflow that enables comprehensive OpenLineage data lineage tracking and observability for data pipelines.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

configuration.mddocs/

Configuration Management

Configuration functions for accessing and controlling OpenLineage settings, including transport configuration, selective enabling, custom extractors, debugging options, and execution parameters.

Capabilities

Core Configuration Functions

Access fundamental OpenLineage configuration settings that control the behavior of lineage collection and emission.

def is_disabled() -> bool:
    """
    Check if OpenLineage is completely disabled.
    
    Returns:
        bool: True if OpenLineage events should not be collected or emitted
    """

def namespace() -> str:
    """
    Get the OpenLineage namespace for lineage events.
    
    Returns:
        str: Namespace string that groups lineage events logically
    """

def transport() -> dict[str, Any]:
    """
    Get the transport configuration for sending OpenLineage events.
    
    Returns:
        dict: Transport configuration including type and connection details
    """

Selective Lineage Configuration

Control selective lineage collection behavior, allowing fine-grained control over which operators and DAGs emit lineage events.

def selective_enable() -> bool:
    """
    Check if selective lineage mode is enabled.
    
    When enabled, lineage is only collected for DAGs/tasks explicitly marked
    with enable_lineage().
    
    Returns:
        bool: True if selective enable mode is active
    """

def disabled_operators() -> set[str]:
    """
    Get the set of operator class names that are disabled for lineage collection.
    
    Returns:
        set[str]: Set of fully qualified operator class names to exclude
    """

Custom Component Configuration

Access configuration for custom extractors and facets that extend OpenLineage functionality.

def custom_extractors() -> set[str]:
    """
    Get the set of custom extractor class paths registered for lineage extraction.
    
    Returns:
        set[str]: Set of fully qualified class paths for custom extractors
    """

def custom_run_facets() -> set[str]:
    """
    Get the set of custom run facet function paths for metadata enrichment.
    
    Returns:
        set[str]: Set of fully qualified function paths for custom facets
    """

Source Code and Documentation Configuration

Control inclusion of source code and additional information in lineage events.

def is_source_enabled() -> bool:
    """
    Check if source code inclusion is enabled for lineage events.
    
    When enabled, operators like PythonOperator and BashOperator include
    their source code in the lineage events.
    
    Returns:
        bool: True if source code should be included in events
    """

def include_full_task_info() -> bool:
    """
    Check if full task information should be included in lineage events.
    
    When enabled, events include comprehensive task metadata which may
    contain large fields.
    
    Returns:
        bool: True if full task info should be included
    """

Spark Integration Configuration

Configuration specific to Spark application integration and property injection.

def spark_inject_parent_job_info() -> bool:
    """
    Check if parent job information should be injected into Spark applications.
    
    When enabled, automatically injects OpenLineage parent job details
    (namespace, job name, run id) into Spark application properties.
    
    Returns:
        bool: True if parent job info injection is enabled
    """

def spark_inject_transport_info() -> bool:
    """
    Check if transport information should be injected into Spark applications.
    
    When enabled, automatically injects OpenLineage transport configuration
    into Spark application properties for lineage emission.
    
    Returns:
        bool: True if transport info injection is enabled
    """

Performance and Debugging Configuration

Configuration options for performance tuning, debugging, and operational control.

def debug_mode() -> bool:
    """
    Check if debug mode is enabled for OpenLineage events.
    
    When enabled, events include debugging information such as installed
    packages and their versions, potentially creating large events.
    
    Returns:
        bool: True if debug mode is active
    """

def execution_timeout() -> int:
    """
    Get the maximum execution timeout for OpenLineage metadata extraction.
    
    Returns:
        int: Timeout in seconds for metadata extraction operations
    """

def dag_state_change_process_pool_size() -> int:
    """
    Get the number of processes for handling DAG state changes asynchronously.
    
    Returns:
        int: Process pool size for async DAG state change processing
    """

Configuration File Access

Access to configuration file paths and legacy environment variable handling.

def config_path(check_legacy_env_var: bool = True) -> str:
    """
    Get the path to the OpenLineage configuration file.
    
    Args:
        check_legacy_env_var: Whether to check legacy environment variables
    
    Returns:
        str: Absolute path to the OpenLineage configuration file
    """

Usage Examples

Basic Configuration Check

from airflow.providers.openlineage.conf import is_disabled, namespace, transport

# Check if OpenLineage is enabled
if not is_disabled():
    print(f"OpenLineage namespace: {namespace()}")
    print(f"Transport config: {transport()}")

Selective Enable Configuration

from airflow.providers.openlineage.conf import selective_enable, disabled_operators

if selective_enable():
    print("Selective lineage mode is active")
else:
    disabled = disabled_operators()
    if disabled:
        print(f"Disabled operators: {disabled}")

Debug and Performance Settings

from airflow.providers.openlineage.conf import debug_mode, execution_timeout, include_full_task_info

print(f"Debug mode: {debug_mode()}")
print(f"Execution timeout: {execution_timeout()}s")
print(f"Include full task info: {include_full_task_info()}")

Custom Components Check

from airflow.providers.openlineage.conf import custom_extractors, custom_run_facets

extractors = custom_extractors()
facets = custom_run_facets()

print(f"Custom extractors: {extractors}")
print(f"Custom facets: {facets}")

Configuration Examples

Transport Configuration

# HTTP Transport
transport_config = {
    "type": "http",
    "url": "http://localhost:5000",
    "endpoint": "api/v1/lineage",
    "timeout": 5,
    "verify": False
}

# Kafka Transport  
transport_config = {
    "type": "kafka",
    "config": {
        "bootstrap.servers": "localhost:9092",
        "acks": "all",
        "retries": 3
    },
    "topic": "openlineage.events"
}

# File Transport
transport_config = {
    "type": "file",
    "location": "/tmp/openlineage"
}

Airflow Configuration

Configuration options can be set in airflow.cfg:

[openlineage]
disabled = False
namespace = my_airflow_instance
transport = {"type": "http", "url": "http://localhost:5000"}
selective_enable = False
disabled_for_operators = airflow.operators.bash.BashOperator;airflow.operators.python.PythonOperator
extractors = my_package.CustomExtractor;my_package.AnotherExtractor
custom_run_facets = my_package.custom_facet_function
debug_mode = False
execution_timeout = 10
include_full_task_info = False

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-openlineage

docs

configuration.md

facets-metadata.md

index.md

lineage-extraction.md

plugin-integration.md

selective-control.md

spark-integration.md

sql-parsing.md

sql-utilities.md

template-macros.md

utility-functions.md

tile.json