Programmatically author, schedule and monitor data pipelines
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Cross-communication system for sharing data between tasks including XComArg, custom backends, and serialization. XCom enables tasks to exchange small amounts of data and coordinate execution.
Store and retrieve data between tasks in a DAG execution.
class XCom:
@classmethod
def get_one(
cls,
key: Optional[str] = None,
task_id: Optional[str] = None,
dag_id: Optional[str] = None,
execution_date: Optional[datetime] = None,
session: Optional[Session] = None
) -> Any:
"""
Get a single XCom value.
Args:
key: XCom key (None for return value)
task_id: Source task ID
dag_id: Source DAG ID
execution_date: Execution date
session: Database session
Returns:
XCom value
"""
@classmethod
def get_many(
cls,
execution_date: datetime,
key: Optional[str] = None,
task_ids: Optional[List[str]] = None,
dag_ids: Optional[List[str]] = None,
include_prior_dates: bool = False,
limit: Optional[int] = None,
session: Optional[Session] = None
) -> List[Dict[str, Any]]:
"""
Get multiple XCom values.
Args:
execution_date: Execution date
key: XCom key filter
task_ids: Task ID filters
dag_ids: DAG ID filters
include_prior_dates: Include values from prior dates
limit: Maximum number of results
session: Database session
Returns:
List of XCom records
"""
@classmethod
def set(
cls,
key: str,
value: Any,
task_id: str,
dag_id: str,
execution_date: datetime,
session: Optional[Session] = None
) -> None:
"""
Set an XCom value.
Args:
key: XCom key
value: Value to store
task_id: Source task ID
dag_id: Source DAG ID
execution_date: Execution date
session: Database session
"""
@classmethod
def delete(
cls,
key: Optional[str] = None,
task_id: Optional[str] = None,
dag_id: Optional[str] = None,
execution_date: Optional[datetime] = None,
session: Optional[Session] = None
) -> None:
"""
Delete XCom values.
Args:
key: XCom key filter
task_id: Task ID filter
dag_id: DAG ID filter
execution_date: Execution date filter
session: Database session
"""
@classmethod
def clear(
cls,
execution_date: datetime,
dag_id: str,
task_id: Optional[str] = None,
session: Optional[Session] = None
) -> None:
"""
Clear XCom values for a DAG run.
Args:
execution_date: Execution date
dag_id: DAG ID
task_id: Optional task ID filter
session: Database session
"""Handle XCom data as task arguments with automatic dependency management.
class XComArg:
def __init__(
self,
operator: BaseOperator,
key: Optional[str] = None
):
"""
Create an XCom argument reference.
Args:
operator: Source operator
key: XCom key (None for return value)
"""
def resolve(self, context: Context) -> Any:
"""
Resolve XCom value from context.
Args:
context: Task execution context
Returns:
Resolved XCom value
"""
def __getitem__(self, key: str) -> 'XComArg':
"""
Access nested XCom data.
Args:
key: Dictionary key or list index
Returns:
New XComArg for nested data
"""
def map(self, function: Callable) -> 'XComArg':
"""
Apply function to XCom value.
Args:
function: Function to apply
Returns:
New XComArg with transformed value
"""Usage example:
from airflow.decorators import dag, task
from airflow.models import XComArg
@dag(dag_id='xcom_example', start_date=datetime(2024, 1, 1))
def xcom_example():
@task
def extract_data():
"""Extract data and return via XCom."""
return {
'records': [{'id': 1, 'name': 'Alice'}, {'id': 2, 'name': 'Bob'}],
'count': 2,
'metadata': {'source': 'database', 'timestamp': '2024-01-01T10:00:00Z'}
}
@task
def process_records(data):
"""Process records from upstream task."""
records = data['records']
processed = []
for record in records:
processed.append({
'id': record['id'],
'name': record['name'].upper(),
'processed_at': datetime.now().isoformat()
})
return {'processed_records': processed, 'count': len(processed)}
@task
def save_metadata(extract_data, process_data):
"""Save combined metadata."""
metadata = {
'original_count': extract_data['count'],
'processed_count': process_data['count'],
'source': extract_data['metadata']['source']
}
return metadata
# Set up data flow
raw_data = extract_data()
processed_data = process_records(raw_data)
metadata = save_metadata(raw_data, processed_data)
dag_instance = xcom_example()Customize XCom storage and serialization.
class BaseXCom:
"""Base class for XCom backends."""
@classmethod
def serialize_value(
cls,
value: Any,
key: Optional[str] = None,
task_id: Optional[str] = None,
dag_id: Optional[str] = None,
execution_date: Optional[datetime] = None
) -> Any:
"""
Serialize value for storage.
Args:
value: Value to serialize
key: XCom key
task_id: Task ID
dag_id: DAG ID
execution_date: Execution date
Returns:
Serialized value
"""
@classmethod
def deserialize_value(
cls,
result: Any
) -> Any:
"""
Deserialize value from storage.
Args:
result: Serialized result
Returns:
Deserialized value
"""
@classmethod
def orm_deserialize_value(cls) -> Any:
"""ORM deserialization method."""
# Example custom XCom backend
class S3XComBackend(BaseXCom):
"""Store XCom data in S3 for large objects."""
@classmethod
def serialize_value(cls, value, **kwargs):
if isinstance(value, dict) and len(str(value)) > 1000:
# Store large objects in S3
s3_key = f"xcom/{kwargs['dag_id']}/{kwargs['task_id']}/{kwargs['key']}"
# Upload to S3 and return reference
return {'s3_key': s3_key, 'type': 's3_reference'}
return super().serialize_value(value, **kwargs)Access XCom and other data through task context and templates.
class Context:
"""Task execution context containing XCom and other runtime data."""
# XCom access
task_instance: TaskInstance
task: BaseOperator
dag: DAG
dag_run: DagRun
# Date/time information
execution_date: datetime
ds: str # execution_date as YYYY-MM-DD
ds_nodash: str # execution_date as YYYYMMDD
ts: str # execution_date as ISO timestamp
ts_nodash: str # execution_date as timestamp without separators
# Previous/next dates
prev_execution_date: Optional[datetime]
prev_ds: Optional[str]
next_execution_date: Optional[datetime]
next_ds: Optional[str]
# Configuration and parameters
params: Dict[str, Any] # DAG and task parameters
var: Dict[str, Any] # Variables access
conf: Dict[str, Any] # Configuration access
# XCom utilities
def ti_xcom_pull(
self,
key: Optional[str] = None,
task_ids: Optional[Union[str, List[str]]] = None,
dag_id: Optional[str] = None,
include_prior_dates: bool = False
) -> Any:
"""Pull XCom values in templates and contexts."""
def ti_xcom_push(
self,
key: str,
value: Any
) -> None:
"""Push XCom values in templates and contexts."""
def get_current_context() -> Context:
"""
Get the current task execution context.
Returns:
Current execution context
"""Usage in templates:
from airflow.decorators import dag, task
@dag(dag_id='template_xcom_example', start_date=datetime(2024, 1, 1))
def template_xcom_example():
@task
def upstream_task():
return {'message': 'Hello from upstream', 'count': 42}
@task
def downstream_task():
# Access XCom in task code
from airflow.operators.python import get_current_context
context = get_current_context()
upstream_data = context['ti'].xcom_pull(task_ids='upstream_task')
return f"Received: {upstream_data['message']} with count {upstream_data['count']}"
upstream = upstream_task()
downstream = downstream_task()
upstream >> downstream
dag_instance = template_xcom_example()Handle complex data types in XCom.
# Supported types by default
SUPPORTED_TYPES = [
int, float, str, bool, list, dict, tuple, set,
datetime, date, timedelta, Decimal, bytes
]
def serialize_xcom_value(value: Any) -> Tuple[str, Any]:
"""
Serialize value for XCom storage.
Args:
value: Value to serialize
Returns:
Tuple of (type_name, serialized_value)
"""
def deserialize_xcom_value(type_name: str, value: Any) -> Any:
"""
Deserialize value from XCom storage.
Args:
type_name: Type identifier
value: Serialized value
Returns:
Deserialized value
"""
# Custom serialization example
class CustomXComSerializer:
@staticmethod
def serialize(obj):
if isinstance(obj, pandas.DataFrame):
return obj.to_json()
return obj
@staticmethod
def deserialize(data):
if isinstance(data, str) and data.startswith('{"'):
return pandas.read_json(data)
return datafrom typing import Union, Optional, List, Dict, Any, Callable
from datetime import datetime
from airflow.models.baseoperator import BaseOperator
from airflow.models.taskinstance import TaskInstance
from airflow.utils.context import Context
XComKey = Optional[str]
XComValue = Any
SerializedValue = Union[str, int, float, bool, dict, list, None]Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow