CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow

Programmatically author, schedule and monitor data pipelines

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

xcom.mddocs/

Cross-Communication (XCom)

Cross-communication system for sharing data between tasks including XComArg, custom backends, and serialization. XCom enables tasks to exchange small amounts of data and coordinate execution.

Capabilities

XCom Operations

Store and retrieve data between tasks in a DAG execution.

class XCom:
    @classmethod
    def get_one(
        cls,
        key: Optional[str] = None,
        task_id: Optional[str] = None,
        dag_id: Optional[str] = None,
        execution_date: Optional[datetime] = None,
        session: Optional[Session] = None
    ) -> Any:
        """
        Get a single XCom value.
        
        Args:
            key: XCom key (None for return value)
            task_id: Source task ID
            dag_id: Source DAG ID
            execution_date: Execution date
            session: Database session
            
        Returns:
            XCom value
        """
    
    @classmethod
    def get_many(
        cls,
        execution_date: datetime,
        key: Optional[str] = None,
        task_ids: Optional[List[str]] = None,
        dag_ids: Optional[List[str]] = None,
        include_prior_dates: bool = False,
        limit: Optional[int] = None,
        session: Optional[Session] = None
    ) -> List[Dict[str, Any]]:
        """
        Get multiple XCom values.
        
        Args:
            execution_date: Execution date
            key: XCom key filter
            task_ids: Task ID filters
            dag_ids: DAG ID filters
            include_prior_dates: Include values from prior dates
            limit: Maximum number of results
            session: Database session
            
        Returns:
            List of XCom records
        """
    
    @classmethod
    def set(
        cls,
        key: str,
        value: Any,
        task_id: str,
        dag_id: str,
        execution_date: datetime,
        session: Optional[Session] = None
    ) -> None:
        """
        Set an XCom value.
        
        Args:
            key: XCom key
            value: Value to store
            task_id: Source task ID
            dag_id: Source DAG ID
            execution_date: Execution date
            session: Database session
        """
    
    @classmethod
    def delete(
        cls,
        key: Optional[str] = None,
        task_id: Optional[str] = None,
        dag_id: Optional[str] = None,
        execution_date: Optional[datetime] = None,
        session: Optional[Session] = None
    ) -> None:
        """
        Delete XCom values.
        
        Args:
            key: XCom key filter
            task_id: Task ID filter
            dag_id: DAG ID filter
            execution_date: Execution date filter
            session: Database session
        """

    @classmethod
    def clear(
        cls,
        execution_date: datetime,
        dag_id: str,
        task_id: Optional[str] = None,
        session: Optional[Session] = None
    ) -> None:
        """
        Clear XCom values for a DAG run.
        
        Args:
            execution_date: Execution date
            dag_id: DAG ID
            task_id: Optional task ID filter
            session: Database session
        """

XCom Arguments

Handle XCom data as task arguments with automatic dependency management.

class XComArg:
    def __init__(
        self,
        operator: BaseOperator,
        key: Optional[str] = None
    ):
        """
        Create an XCom argument reference.
        
        Args:
            operator: Source operator
            key: XCom key (None for return value)
        """
    
    def resolve(self, context: Context) -> Any:
        """
        Resolve XCom value from context.
        
        Args:
            context: Task execution context
            
        Returns:
            Resolved XCom value
        """
    
    def __getitem__(self, key: str) -> 'XComArg':
        """
        Access nested XCom data.
        
        Args:
            key: Dictionary key or list index
            
        Returns:
            New XComArg for nested data
        """
    
    def map(self, function: Callable) -> 'XComArg':
        """
        Apply function to XCom value.
        
        Args:
            function: Function to apply
            
        Returns:
            New XComArg with transformed value
        """

Usage example:

from airflow.decorators import dag, task
from airflow.models import XComArg

@dag(dag_id='xcom_example', start_date=datetime(2024, 1, 1))
def xcom_example():
    @task
    def extract_data():
        """Extract data and return via XCom."""
        return {
            'records': [{'id': 1, 'name': 'Alice'}, {'id': 2, 'name': 'Bob'}],
            'count': 2,
            'metadata': {'source': 'database', 'timestamp': '2024-01-01T10:00:00Z'}
        }
    
    @task
    def process_records(data):
        """Process records from upstream task."""
        records = data['records']
        processed = []
        for record in records:
            processed.append({
                'id': record['id'],
                'name': record['name'].upper(),
                'processed_at': datetime.now().isoformat()
            })
        return {'processed_records': processed, 'count': len(processed)}
    
    @task
    def save_metadata(extract_data, process_data):
        """Save combined metadata."""
        metadata = {
            'original_count': extract_data['count'],
            'processed_count': process_data['count'],
            'source': extract_data['metadata']['source']
        }
        return metadata
    
    # Set up data flow
    raw_data = extract_data()
    processed_data = process_records(raw_data)
    metadata = save_metadata(raw_data, processed_data)

dag_instance = xcom_example()

XCom Backends

Customize XCom storage and serialization.

class BaseXCom:
    """Base class for XCom backends."""
    
    @classmethod
    def serialize_value(
        cls,
        value: Any,
        key: Optional[str] = None,
        task_id: Optional[str] = None,
        dag_id: Optional[str] = None,
        execution_date: Optional[datetime] = None
    ) -> Any:
        """
        Serialize value for storage.
        
        Args:
            value: Value to serialize
            key: XCom key
            task_id: Task ID
            dag_id: DAG ID
            execution_date: Execution date
            
        Returns:
            Serialized value
        """
    
    @classmethod
    def deserialize_value(
        cls,
        result: Any
    ) -> Any:
        """
        Deserialize value from storage.
        
        Args:
            result: Serialized result
            
        Returns:
            Deserialized value
        """
    
    @classmethod
    def orm_deserialize_value(cls) -> Any:
        """ORM deserialization method."""

# Example custom XCom backend
class S3XComBackend(BaseXCom):
    """Store XCom data in S3 for large objects."""
    
    @classmethod
    def serialize_value(cls, value, **kwargs):
        if isinstance(value, dict) and len(str(value)) > 1000:
            # Store large objects in S3
            s3_key = f"xcom/{kwargs['dag_id']}/{kwargs['task_id']}/{kwargs['key']}"
            # Upload to S3 and return reference
            return {'s3_key': s3_key, 'type': 's3_reference'}
        return super().serialize_value(value, **kwargs)

Context and Template Variables

Access XCom and other data through task context and templates.

class Context:
    """Task execution context containing XCom and other runtime data."""
    
    # XCom access
    task_instance: TaskInstance
    task: BaseOperator
    dag: DAG
    dag_run: DagRun
    
    # Date/time information
    execution_date: datetime
    ds: str                    # execution_date as YYYY-MM-DD
    ds_nodash: str            # execution_date as YYYYMMDD
    ts: str                   # execution_date as ISO timestamp
    ts_nodash: str            # execution_date as timestamp without separators
    
    # Previous/next dates
    prev_execution_date: Optional[datetime]
    prev_ds: Optional[str]
    next_execution_date: Optional[datetime]
    next_ds: Optional[str]
    
    # Configuration and parameters
    params: Dict[str, Any]    # DAG and task parameters
    var: Dict[str, Any]       # Variables access
    conf: Dict[str, Any]      # Configuration access
    
    # XCom utilities
    def ti_xcom_pull(
        self,
        key: Optional[str] = None,
        task_ids: Optional[Union[str, List[str]]] = None,
        dag_id: Optional[str] = None,
        include_prior_dates: bool = False
    ) -> Any:
        """Pull XCom values in templates and contexts."""
    
    def ti_xcom_push(
        self,
        key: str,
        value: Any
    ) -> None:
        """Push XCom values in templates and contexts."""

def get_current_context() -> Context:
    """
    Get the current task execution context.
    
    Returns:
        Current execution context
    """

Usage in templates:

from airflow.decorators import dag, task

@dag(dag_id='template_xcom_example', start_date=datetime(2024, 1, 1))
def template_xcom_example():
    @task
    def upstream_task():
        return {'message': 'Hello from upstream', 'count': 42}
    
    @task
    def downstream_task():
        # Access XCom in task code
        from airflow.operators.python import get_current_context
        context = get_current_context()
        
        upstream_data = context['ti'].xcom_pull(task_ids='upstream_task')
        return f"Received: {upstream_data['message']} with count {upstream_data['count']}"
    
    upstream = upstream_task()
    downstream = downstream_task()
    
    upstream >> downstream

dag_instance = template_xcom_example()

XCom Serialization

Handle complex data types in XCom.

# Supported types by default
SUPPORTED_TYPES = [
    int, float, str, bool, list, dict, tuple, set,
    datetime, date, timedelta, Decimal, bytes
]

def serialize_xcom_value(value: Any) -> Tuple[str, Any]:
    """
    Serialize value for XCom storage.
    
    Args:
        value: Value to serialize
        
    Returns:
        Tuple of (type_name, serialized_value)
    """

def deserialize_xcom_value(type_name: str, value: Any) -> Any:
    """
    Deserialize value from XCom storage.
    
    Args:
        type_name: Type identifier
        value: Serialized value
        
    Returns:
        Deserialized value
    """

# Custom serialization example
class CustomXComSerializer:
    @staticmethod
    def serialize(obj):
        if isinstance(obj, pandas.DataFrame):
            return obj.to_json()
        return obj
    
    @staticmethod
    def deserialize(data):
        if isinstance(data, str) and data.startswith('{"'):
            return pandas.read_json(data)
        return data

Types

from typing import Union, Optional, List, Dict, Any, Callable
from datetime import datetime
from airflow.models.baseoperator import BaseOperator
from airflow.models.taskinstance import TaskInstance
from airflow.utils.context import Context

XComKey = Optional[str]
XComValue = Any
SerializedValue = Union[str, int, float, bool, dict, list, None]

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow

docs

assets-scheduling.md

cli-utilities.md

configuration.md

dag-management.md

database-models.md

exceptions.md

executors.md

extensions.md

index.md

task-operators.md

xcom.md

tile.json