0
# XCom Object Storage Backend
1
2
The XComObjectStorageBackend provides intelligent XCom storage that automatically routes data between database and object storage based on configurable size thresholds. It supports compression, automatic cleanup, and seamless integration with Airflow's XCom system.
3
4
## Capabilities
5
6
### XCom Backend Class
7
8
Core XCom backend implementation that extends Airflow's BaseXCom to provide object storage capabilities with size-based routing and compression support.
9
10
```python { .api }
11
class XComObjectStorageBackend(BaseXCom):
12
"""
13
XCom backend that stores data in an object store or database depending on the size of the data.
14
15
If the value is larger than the configured threshold, it will be stored in an object store.
16
Otherwise, it will be stored in the database. If it is stored in an object store, the path
17
to the object in the store will be returned and saved in the database (by BaseXCom).
18
"""
19
20
@staticmethod
21
def serialize_value(
22
value: T,
23
*,
24
key: str | None = None,
25
task_id: str | None = None,
26
dag_id: str | None = None,
27
run_id: str | None = None,
28
map_index: int | None = None,
29
) -> bytes | str:
30
"""
31
Serialize value for storage, routing to object storage based on size threshold.
32
33
Parameters:
34
- value: The value to serialize
35
- key: XCom key identifier
36
- task_id: Task identifier
37
- dag_id: DAG identifier
38
- run_id: DAG run identifier
39
- map_index: Task map index for mapped tasks
40
41
Returns:
42
- bytes | str: Serialized value or object storage path reference
43
"""
44
45
@staticmethod
46
def deserialize_value(result) -> Any:
47
"""
48
Deserialize value from database or object storage.
49
50
Compression is inferred from the file extension.
51
52
Parameters:
53
- result: XCom result from database
54
55
Returns:
56
- Any: Deserialized value
57
"""
58
59
@staticmethod
60
def purge(xcom: XComResult, session: Session | None = None) -> None:
61
"""
62
Clean up object storage files when XCom records are deleted.
63
64
Parameters:
65
- xcom: XCom result to purge
66
- session: Optional database session
67
"""
68
69
@staticmethod
70
def _get_full_path(data: str) -> ObjectStoragePath:
71
"""
72
Get the full object storage path from stored value.
73
74
Parameters:
75
- data: Stored path string
76
77
Returns:
78
- ObjectStoragePath: Full path object
79
80
Raises:
81
- ValueError: If the key is not relative to the configured path
82
- TypeError: If the url is not a valid url or cannot be split
83
"""
84
```
85
86
### Configuration Helper Functions
87
88
Utility functions for accessing XCom backend configuration with caching for performance.
89
90
```python { .api }
91
def _get_compression_suffix(compression: str) -> str:
92
"""
93
Return the compression suffix for the given compression algorithm.
94
95
Parameters:
96
- compression: Compression algorithm name
97
98
Returns:
99
- str: File extension suffix for compression
100
101
Raises:
102
- ValueError: If the compression algorithm is not supported
103
"""
104
105
@cache
106
def _get_base_path() -> ObjectStoragePath:
107
"""
108
Get the configured base path for object storage.
109
110
Returns:
111
- ObjectStoragePath: Base path from configuration
112
"""
113
114
@cache
115
def _get_compression() -> str | None:
116
"""
117
Get the configured compression algorithm.
118
119
Returns:
120
- str | None: Compression algorithm or None if not configured
121
"""
122
123
@cache
124
def _get_threshold() -> int:
125
"""
126
Get the configured size threshold for object storage.
127
128
Returns:
129
- int: Threshold in bytes (-1 for always database, 0 for always object storage)
130
"""
131
```
132
133
## Configuration
134
135
### Required Configuration
136
137
The XCom backend requires configuration in `airflow.cfg` or environment variables:
138
139
```ini
140
[common.io]
141
xcom_objectstorage_path = s3://conn_id@my-bucket/xcom-data
142
xcom_objectstorage_threshold = 1000000 # 1MB threshold
143
xcom_objectstorage_compression = gz # Optional compression
144
```
145
146
### Configuration Options
147
148
```python { .api }
149
# Configuration section: common.io
150
class XComConfiguration:
151
"""XCom object storage configuration options."""
152
153
xcom_objectstorage_path: str
154
"""
155
Path to a location on object storage where XComs can be stored in url format.
156
Example: "s3://conn_id@bucket/path"
157
Default: "" (required for backend to function)
158
"""
159
160
xcom_objectstorage_threshold: int
161
"""
162
Threshold in bytes for storing XComs in object storage.
163
-1: always store in database
164
0: always store in object storage
165
positive number: store in object storage if size exceeds threshold
166
Default: -1
167
"""
168
169
xcom_objectstorage_compression: str
170
"""
171
Compression algorithm to use when storing XComs in object storage.
172
Supported: snappy, zip, gzip, bz2, lzma
173
Note: Algorithm must be available in Python installation
174
Default: "" (no compression)
175
"""
176
```
177
178
## Usage Examples
179
180
### Backend Configuration
181
182
Configure the XCom backend in `airflow.cfg`:
183
184
```ini
185
[core]
186
xcom_backend = airflow.providers.common.io.xcom.backend.XComObjectStorageBackend
187
188
[common.io]
189
xcom_objectstorage_path = s3://aws_default@my-xcom-bucket/xcom-data
190
xcom_objectstorage_threshold = 1048576 # 1MB
191
xcom_objectstorage_compression = gz
192
```
193
194
### Automatic Usage in Tasks
195
196
Once configured, the backend works automatically with any XCom operations:
197
198
```python
199
from airflow import DAG
200
from airflow.operators.python import PythonOperator
201
from datetime import datetime
202
import pandas as pd
203
204
dag = DAG('xcom_example', start_date=datetime(2023, 1, 1))
205
206
def produce_large_data(**context):
207
# Large DataFrame will be stored in object storage
208
df = pd.DataFrame({'data': range(100000)})
209
return df.to_dict()
210
211
def consume_data(**context):
212
# Data automatically retrieved from object storage
213
data = context['task_instance'].xcom_pull(task_ids='produce_data')
214
df = pd.DataFrame(data)
215
return len(df)
216
217
produce_task = PythonOperator(
218
task_id='produce_data',
219
python_callable=produce_large_data,
220
dag=dag
221
)
222
223
consume_task = PythonOperator(
224
task_id='consume_data',
225
python_callable=consume_data,
226
dag=dag
227
)
228
229
produce_task >> consume_task
230
```
231
232
### Small vs Large Data Handling
233
234
```python
235
def small_data_task(**context):
236
# Small data stored in database
237
return {"status": "success", "count": 42}
238
239
def large_data_task(**context):
240
# Large data automatically routed to object storage
241
return {
242
"large_list": list(range(50000)),
243
"metadata": {"processing_time": "5min"}
244
}
245
```
246
247
## Storage Behavior
248
249
### Size-Based Routing
250
251
1. **Below threshold**: Data stored in Airflow database as usual
252
2. **Above threshold**: Data compressed (if configured) and stored in object storage
253
3. **Object storage path**: Stored in database as reference
254
255
### File Organization
256
257
Objects are stored with the following path structure:
258
```
259
{base_path}/{dag_id}/{run_id}/{task_id}/{uuid}.{compression_suffix}
260
```
261
262
### Compression Support
263
264
Supported compression algorithms:
265
- **gz**: Gzip compression (default available)
266
- **bz2**: Bzip2 compression (default available)
267
- **zip**: Zip compression (default available)
268
- **lzma**: LZMA compression (default available)
269
- **snappy**: Snappy compression (requires python-snappy)
270
271
### Cleanup and Lifecycle
272
273
- Objects are automatically purged when XCom records are deleted
274
- Supports Airflow's standard XCom cleanup mechanisms
275
- Handles missing files gracefully during deserialization
276
277
## Version Compatibility
278
279
The backend supports both Airflow 2.x and 3.0+ through conditional imports:
280
281
- **Airflow 2.x**: Uses `airflow.models.xcom.BaseXCom` and `airflow.io.path.ObjectStoragePath`
282
- **Airflow 3.0+**: Uses `airflow.sdk.bases.xcom.BaseXCom` and `airflow.sdk.ObjectStoragePath`
283
284
Requires minimum Airflow version 2.9.0 for XCom backend functionality.