or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

asset-handlers.mdfile-transfer.mdindex.mdxcom-backend.md

xcom-backend.mddocs/

0

# XCom Object Storage Backend

1

2

The XComObjectStorageBackend provides intelligent XCom storage that automatically routes data between database and object storage based on configurable size thresholds. It supports compression, automatic cleanup, and seamless integration with Airflow's XCom system.

3

4

## Capabilities

5

6

### XCom Backend Class

7

8

Core XCom backend implementation that extends Airflow's BaseXCom to provide object storage capabilities with size-based routing and compression support.

9

10

```python { .api }

11

class XComObjectStorageBackend(BaseXCom):

12

"""

13

XCom backend that stores data in an object store or database depending on the size of the data.

14

15

If the value is larger than the configured threshold, it will be stored in an object store.

16

Otherwise, it will be stored in the database. If it is stored in an object store, the path

17

to the object in the store will be returned and saved in the database (by BaseXCom).

18

"""

19

20

@staticmethod

21

def serialize_value(

22

value: T,

23

*,

24

key: str | None = None,

25

task_id: str | None = None,

26

dag_id: str | None = None,

27

run_id: str | None = None,

28

map_index: int | None = None,

29

) -> bytes | str:

30

"""

31

Serialize value for storage, routing to object storage based on size threshold.

32

33

Parameters:

34

- value: The value to serialize

35

- key: XCom key identifier

36

- task_id: Task identifier

37

- dag_id: DAG identifier

38

- run_id: DAG run identifier

39

- map_index: Task map index for mapped tasks

40

41

Returns:

42

- bytes | str: Serialized value or object storage path reference

43

"""

44

45

@staticmethod

46

def deserialize_value(result) -> Any:

47

"""

48

Deserialize value from database or object storage.

49

50

Compression is inferred from the file extension.

51

52

Parameters:

53

- result: XCom result from database

54

55

Returns:

56

- Any: Deserialized value

57

"""

58

59

@staticmethod

60

def purge(xcom: XComResult, session: Session | None = None) -> None:

61

"""

62

Clean up object storage files when XCom records are deleted.

63

64

Parameters:

65

- xcom: XCom result to purge

66

- session: Optional database session

67

"""

68

69

@staticmethod

70

def _get_full_path(data: str) -> ObjectStoragePath:

71

"""

72

Get the full object storage path from stored value.

73

74

Parameters:

75

- data: Stored path string

76

77

Returns:

78

- ObjectStoragePath: Full path object

79

80

Raises:

81

- ValueError: If the key is not relative to the configured path

82

- TypeError: If the url is not a valid url or cannot be split

83

"""

84

```

85

86

### Configuration Helper Functions

87

88

Utility functions for accessing XCom backend configuration with caching for performance.

89

90

```python { .api }

91

def _get_compression_suffix(compression: str) -> str:

92

"""

93

Return the compression suffix for the given compression algorithm.

94

95

Parameters:

96

- compression: Compression algorithm name

97

98

Returns:

99

- str: File extension suffix for compression

100

101

Raises:

102

- ValueError: If the compression algorithm is not supported

103

"""

104

105

@cache

106

def _get_base_path() -> ObjectStoragePath:

107

"""

108

Get the configured base path for object storage.

109

110

Returns:

111

- ObjectStoragePath: Base path from configuration

112

"""

113

114

@cache

115

def _get_compression() -> str | None:

116

"""

117

Get the configured compression algorithm.

118

119

Returns:

120

- str | None: Compression algorithm or None if not configured

121

"""

122

123

@cache

124

def _get_threshold() -> int:

125

"""

126

Get the configured size threshold for object storage.

127

128

Returns:

129

- int: Threshold in bytes (-1 for always database, 0 for always object storage)

130

"""

131

```

132

133

## Configuration

134

135

### Required Configuration

136

137

The XCom backend requires configuration in `airflow.cfg` or environment variables:

138

139

```ini

140

[common.io]

141

xcom_objectstorage_path = s3://conn_id@my-bucket/xcom-data

142

xcom_objectstorage_threshold = 1000000 # 1MB threshold

143

xcom_objectstorage_compression = gz # Optional compression

144

```

145

146

### Configuration Options

147

148

```python { .api }

149

# Configuration section: common.io

150

class XComConfiguration:

151

"""XCom object storage configuration options."""

152

153

xcom_objectstorage_path: str

154

"""

155

Path to a location on object storage where XComs can be stored in url format.

156

Example: "s3://conn_id@bucket/path"

157

Default: "" (required for backend to function)

158

"""

159

160

xcom_objectstorage_threshold: int

161

"""

162

Threshold in bytes for storing XComs in object storage.

163

-1: always store in database

164

0: always store in object storage

165

positive number: store in object storage if size exceeds threshold

166

Default: -1

167

"""

168

169

xcom_objectstorage_compression: str

170

"""

171

Compression algorithm to use when storing XComs in object storage.

172

Supported: snappy, zip, gzip, bz2, lzma

173

Note: Algorithm must be available in Python installation

174

Default: "" (no compression)

175

"""

176

```

177

178

## Usage Examples

179

180

### Backend Configuration

181

182

Configure the XCom backend in `airflow.cfg`:

183

184

```ini

185

[core]

186

xcom_backend = airflow.providers.common.io.xcom.backend.XComObjectStorageBackend

187

188

[common.io]

189

xcom_objectstorage_path = s3://aws_default@my-xcom-bucket/xcom-data

190

xcom_objectstorage_threshold = 1048576 # 1MB

191

xcom_objectstorage_compression = gz

192

```

193

194

### Automatic Usage in Tasks

195

196

Once configured, the backend works automatically with any XCom operations:

197

198

```python

199

from airflow import DAG

200

from airflow.operators.python import PythonOperator

201

from datetime import datetime

202

import pandas as pd

203

204

dag = DAG('xcom_example', start_date=datetime(2023, 1, 1))

205

206

def produce_large_data(**context):

207

# Large DataFrame will be stored in object storage

208

df = pd.DataFrame({'data': range(100000)})

209

return df.to_dict()

210

211

def consume_data(**context):

212

# Data automatically retrieved from object storage

213

data = context['task_instance'].xcom_pull(task_ids='produce_data')

214

df = pd.DataFrame(data)

215

return len(df)

216

217

produce_task = PythonOperator(

218

task_id='produce_data',

219

python_callable=produce_large_data,

220

dag=dag

221

)

222

223

consume_task = PythonOperator(

224

task_id='consume_data',

225

python_callable=consume_data,

226

dag=dag

227

)

228

229

produce_task >> consume_task

230

```

231

232

### Small vs Large Data Handling

233

234

```python

235

def small_data_task(**context):

236

# Small data stored in database

237

return {"status": "success", "count": 42}

238

239

def large_data_task(**context):

240

# Large data automatically routed to object storage

241

return {

242

"large_list": list(range(50000)),

243

"metadata": {"processing_time": "5min"}

244

}

245

```

246

247

## Storage Behavior

248

249

### Size-Based Routing

250

251

1. **Below threshold**: Data stored in Airflow database as usual

252

2. **Above threshold**: Data compressed (if configured) and stored in object storage

253

3. **Object storage path**: Stored in database as reference

254

255

### File Organization

256

257

Objects are stored with the following path structure:

258

```

259

{base_path}/{dag_id}/{run_id}/{task_id}/{uuid}.{compression_suffix}

260

```

261

262

### Compression Support

263

264

Supported compression algorithms:

265

- **gz**: Gzip compression (default available)

266

- **bz2**: Bzip2 compression (default available)

267

- **zip**: Zip compression (default available)

268

- **lzma**: LZMA compression (default available)

269

- **snappy**: Snappy compression (requires python-snappy)

270

271

### Cleanup and Lifecycle

272

273

- Objects are automatically purged when XCom records are deleted

274

- Supports Airflow's standard XCom cleanup mechanisms

275

- Handles missing files gracefully during deserialization

276

277

## Version Compatibility

278

279

The backend supports both Airflow 2.x and 3.0+ through conditional imports:

280

281

- **Airflow 2.x**: Uses `airflow.models.xcom.BaseXCom` and `airflow.io.path.ObjectStoragePath`

282

- **Airflow 3.0+**: Uses `airflow.sdk.bases.xcom.BaseXCom` and `airflow.sdk.ObjectStoragePath`

283

284

Requires minimum Airflow version 2.9.0 for XCom backend functionality.