Common I/O Provider for Apache Airflow that provides unified file operations and transfers across different storage systems
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
The XComObjectStorageBackend provides intelligent XCom storage that automatically routes data between database and object storage based on configurable size thresholds. It supports compression, automatic cleanup, and seamless integration with Airflow's XCom system.
Core XCom backend implementation that extends Airflow's BaseXCom to provide object storage capabilities with size-based routing and compression support.
class XComObjectStorageBackend(BaseXCom):
"""
XCom backend that stores data in an object store or database depending on the size of the data.
If the value is larger than the configured threshold, it will be stored in an object store.
Otherwise, it will be stored in the database. If it is stored in an object store, the path
to the object in the store will be returned and saved in the database (by BaseXCom).
"""
@staticmethod
def serialize_value(
value: T,
*,
key: str | None = None,
task_id: str | None = None,
dag_id: str | None = None,
run_id: str | None = None,
map_index: int | None = None,
) -> bytes | str:
"""
Serialize value for storage, routing to object storage based on size threshold.
Parameters:
- value: The value to serialize
- key: XCom key identifier
- task_id: Task identifier
- dag_id: DAG identifier
- run_id: DAG run identifier
- map_index: Task map index for mapped tasks
Returns:
- bytes | str: Serialized value or object storage path reference
"""
@staticmethod
def deserialize_value(result) -> Any:
"""
Deserialize value from database or object storage.
Compression is inferred from the file extension.
Parameters:
- result: XCom result from database
Returns:
- Any: Deserialized value
"""
@staticmethod
def purge(xcom: XComResult, session: Session | None = None) -> None:
"""
Clean up object storage files when XCom records are deleted.
Parameters:
- xcom: XCom result to purge
- session: Optional database session
"""
@staticmethod
def _get_full_path(data: str) -> ObjectStoragePath:
"""
Get the full object storage path from stored value.
Parameters:
- data: Stored path string
Returns:
- ObjectStoragePath: Full path object
Raises:
- ValueError: If the key is not relative to the configured path
- TypeError: If the url is not a valid url or cannot be split
"""Utility functions for accessing XCom backend configuration with caching for performance.
def _get_compression_suffix(compression: str) -> str:
"""
Return the compression suffix for the given compression algorithm.
Parameters:
- compression: Compression algorithm name
Returns:
- str: File extension suffix for compression
Raises:
- ValueError: If the compression algorithm is not supported
"""
@cache
def _get_base_path() -> ObjectStoragePath:
"""
Get the configured base path for object storage.
Returns:
- ObjectStoragePath: Base path from configuration
"""
@cache
def _get_compression() -> str | None:
"""
Get the configured compression algorithm.
Returns:
- str | None: Compression algorithm or None if not configured
"""
@cache
def _get_threshold() -> int:
"""
Get the configured size threshold for object storage.
Returns:
- int: Threshold in bytes (-1 for always database, 0 for always object storage)
"""The XCom backend requires configuration in airflow.cfg or environment variables:
[common.io]
xcom_objectstorage_path = s3://conn_id@my-bucket/xcom-data
xcom_objectstorage_threshold = 1000000 # 1MB threshold
xcom_objectstorage_compression = gz # Optional compression# Configuration section: common.io
class XComConfiguration:
"""XCom object storage configuration options."""
xcom_objectstorage_path: str
"""
Path to a location on object storage where XComs can be stored in url format.
Example: "s3://conn_id@bucket/path"
Default: "" (required for backend to function)
"""
xcom_objectstorage_threshold: int
"""
Threshold in bytes for storing XComs in object storage.
-1: always store in database
0: always store in object storage
positive number: store in object storage if size exceeds threshold
Default: -1
"""
xcom_objectstorage_compression: str
"""
Compression algorithm to use when storing XComs in object storage.
Supported: snappy, zip, gzip, bz2, lzma
Note: Algorithm must be available in Python installation
Default: "" (no compression)
"""Configure the XCom backend in airflow.cfg:
[core]
xcom_backend = airflow.providers.common.io.xcom.backend.XComObjectStorageBackend
[common.io]
xcom_objectstorage_path = s3://aws_default@my-xcom-bucket/xcom-data
xcom_objectstorage_threshold = 1048576 # 1MB
xcom_objectstorage_compression = gzOnce configured, the backend works automatically with any XCom operations:
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import pandas as pd
dag = DAG('xcom_example', start_date=datetime(2023, 1, 1))
def produce_large_data(**context):
# Large DataFrame will be stored in object storage
df = pd.DataFrame({'data': range(100000)})
return df.to_dict()
def consume_data(**context):
# Data automatically retrieved from object storage
data = context['task_instance'].xcom_pull(task_ids='produce_data')
df = pd.DataFrame(data)
return len(df)
produce_task = PythonOperator(
task_id='produce_data',
python_callable=produce_large_data,
dag=dag
)
consume_task = PythonOperator(
task_id='consume_data',
python_callable=consume_data,
dag=dag
)
produce_task >> consume_taskdef small_data_task(**context):
# Small data stored in database
return {"status": "success", "count": 42}
def large_data_task(**context):
# Large data automatically routed to object storage
return {
"large_list": list(range(50000)),
"metadata": {"processing_time": "5min"}
}Objects are stored with the following path structure:
{base_path}/{dag_id}/{run_id}/{task_id}/{uuid}.{compression_suffix}Supported compression algorithms:
The backend supports both Airflow 2.x and 3.0+ through conditional imports:
airflow.models.xcom.BaseXCom and airflow.io.path.ObjectStoragePathairflow.sdk.bases.xcom.BaseXCom and airflow.sdk.ObjectStoragePathRequires minimum Airflow version 2.9.0 for XCom backend functionality.
Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-common-io