CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-common-io

Common I/O Provider for Apache Airflow that provides unified file operations and transfers across different storage systems

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

xcom-backend.mddocs/

XCom Object Storage Backend

The XComObjectStorageBackend provides intelligent XCom storage that automatically routes data between database and object storage based on configurable size thresholds. It supports compression, automatic cleanup, and seamless integration with Airflow's XCom system.

Capabilities

XCom Backend Class

Core XCom backend implementation that extends Airflow's BaseXCom to provide object storage capabilities with size-based routing and compression support.

class XComObjectStorageBackend(BaseXCom):
    """
    XCom backend that stores data in an object store or database depending on the size of the data.
    
    If the value is larger than the configured threshold, it will be stored in an object store.
    Otherwise, it will be stored in the database. If it is stored in an object store, the path
    to the object in the store will be returned and saved in the database (by BaseXCom).
    """
    
    @staticmethod
    def serialize_value(
        value: T,
        *,
        key: str | None = None,
        task_id: str | None = None,
        dag_id: str | None = None,
        run_id: str | None = None,
        map_index: int | None = None,
    ) -> bytes | str:
        """
        Serialize value for storage, routing to object storage based on size threshold.
        
        Parameters:
        - value: The value to serialize
        - key: XCom key identifier
        - task_id: Task identifier
        - dag_id: DAG identifier  
        - run_id: DAG run identifier
        - map_index: Task map index for mapped tasks
        
        Returns:
        - bytes | str: Serialized value or object storage path reference
        """
    
    @staticmethod
    def deserialize_value(result) -> Any:
        """
        Deserialize value from database or object storage.
        
        Compression is inferred from the file extension.
        
        Parameters:
        - result: XCom result from database
        
        Returns:
        - Any: Deserialized value
        """
    
    @staticmethod
    def purge(xcom: XComResult, session: Session | None = None) -> None:
        """
        Clean up object storage files when XCom records are deleted.
        
        Parameters:
        - xcom: XCom result to purge
        - session: Optional database session
        """
    
    @staticmethod
    def _get_full_path(data: str) -> ObjectStoragePath:
        """
        Get the full object storage path from stored value.
        
        Parameters:
        - data: Stored path string
        
        Returns:
        - ObjectStoragePath: Full path object
        
        Raises:
        - ValueError: If the key is not relative to the configured path
        - TypeError: If the url is not a valid url or cannot be split
        """

Configuration Helper Functions

Utility functions for accessing XCom backend configuration with caching for performance.

def _get_compression_suffix(compression: str) -> str:
    """
    Return the compression suffix for the given compression algorithm.
    
    Parameters:
    - compression: Compression algorithm name
    
    Returns:
    - str: File extension suffix for compression
    
    Raises:
    - ValueError: If the compression algorithm is not supported
    """

@cache
def _get_base_path() -> ObjectStoragePath:
    """
    Get the configured base path for object storage.
    
    Returns:
    - ObjectStoragePath: Base path from configuration
    """

@cache  
def _get_compression() -> str | None:
    """
    Get the configured compression algorithm.
    
    Returns:
    - str | None: Compression algorithm or None if not configured
    """

@cache
def _get_threshold() -> int:
    """
    Get the configured size threshold for object storage.
    
    Returns:
    - int: Threshold in bytes (-1 for always database, 0 for always object storage)
    """

Configuration

Required Configuration

The XCom backend requires configuration in airflow.cfg or environment variables:

[common.io]
xcom_objectstorage_path = s3://conn_id@my-bucket/xcom-data
xcom_objectstorage_threshold = 1000000  # 1MB threshold
xcom_objectstorage_compression = gz      # Optional compression

Configuration Options

# Configuration section: common.io
class XComConfiguration:
    """XCom object storage configuration options."""
    
    xcom_objectstorage_path: str
    """
    Path to a location on object storage where XComs can be stored in url format.
    Example: "s3://conn_id@bucket/path"
    Default: "" (required for backend to function)
    """
    
    xcom_objectstorage_threshold: int  
    """
    Threshold in bytes for storing XComs in object storage.
    -1: always store in database
    0: always store in object storage  
    positive number: store in object storage if size exceeds threshold
    Default: -1
    """
    
    xcom_objectstorage_compression: str
    """
    Compression algorithm to use when storing XComs in object storage.
    Supported: snappy, zip, gzip, bz2, lzma
    Note: Algorithm must be available in Python installation
    Default: "" (no compression)
    """

Usage Examples

Backend Configuration

Configure the XCom backend in airflow.cfg:

[core]
xcom_backend = airflow.providers.common.io.xcom.backend.XComObjectStorageBackend

[common.io]
xcom_objectstorage_path = s3://aws_default@my-xcom-bucket/xcom-data
xcom_objectstorage_threshold = 1048576  # 1MB
xcom_objectstorage_compression = gz

Automatic Usage in Tasks

Once configured, the backend works automatically with any XCom operations:

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import pandas as pd

dag = DAG('xcom_example', start_date=datetime(2023, 1, 1))

def produce_large_data(**context):
    # Large DataFrame will be stored in object storage
    df = pd.DataFrame({'data': range(100000)})
    return df.to_dict()

def consume_data(**context):
    # Data automatically retrieved from object storage
    data = context['task_instance'].xcom_pull(task_ids='produce_data')
    df = pd.DataFrame(data)
    return len(df)

produce_task = PythonOperator(
    task_id='produce_data',
    python_callable=produce_large_data,
    dag=dag
)

consume_task = PythonOperator(
    task_id='consume_data', 
    python_callable=consume_data,
    dag=dag
)

produce_task >> consume_task

Small vs Large Data Handling

def small_data_task(**context):
    # Small data stored in database
    return {"status": "success", "count": 42}

def large_data_task(**context):
    # Large data automatically routed to object storage
    return {
        "large_list": list(range(50000)),
        "metadata": {"processing_time": "5min"}
    }

Storage Behavior

Size-Based Routing

  1. Below threshold: Data stored in Airflow database as usual
  2. Above threshold: Data compressed (if configured) and stored in object storage
  3. Object storage path: Stored in database as reference

File Organization

Objects are stored with the following path structure:

{base_path}/{dag_id}/{run_id}/{task_id}/{uuid}.{compression_suffix}

Compression Support

Supported compression algorithms:

  • gz: Gzip compression (default available)
  • bz2: Bzip2 compression (default available)
  • zip: Zip compression (default available)
  • lzma: LZMA compression (default available)
  • snappy: Snappy compression (requires python-snappy)

Cleanup and Lifecycle

  • Objects are automatically purged when XCom records are deleted
  • Supports Airflow's standard XCom cleanup mechanisms
  • Handles missing files gracefully during deserialization

Version Compatibility

The backend supports both Airflow 2.x and 3.0+ through conditional imports:

  • Airflow 2.x: Uses airflow.models.xcom.BaseXCom and airflow.io.path.ObjectStoragePath
  • Airflow 3.0+: Uses airflow.sdk.bases.xcom.BaseXCom and airflow.sdk.ObjectStoragePath

Requires minimum Airflow version 2.9.0 for XCom backend functionality.

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-common-io

docs

asset-handlers.md

file-transfer.md

index.md

xcom-backend.md

tile.json