or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

docker-api.mddocker-decorators.mddocker-operations.mddocker-swarm.mderror-handling.mdindex.md

docker-decorators.mddocs/

0

# Containerized Task Decorators

1

2

Transform Python functions into containerized tasks using the @docker_task decorator. This provides seamless integration of containerized execution with Python function workflows, enabling you to run functions in isolated Docker environments with automatic serialization and result handling.

3

4

## Capabilities

5

6

### docker_task Decorator

7

8

Convert Python functions into DockerOperator tasks with automatic serialization.

9

10

```python { .api }

11

def docker_task(

12

image: str,

13

python_command: str = "python",

14

serializer: Literal["pickle", "dill", "cloudpickle"] = "pickle",

15

multiple_outputs: bool | None = None,

16

**kwargs

17

) -> TaskDecorator:

18

"""

19

Decorator that converts a Python function into a DockerOperator task.

20

21

Args:

22

image: Docker image to run the function in

23

python_command: Python command to use in container (default: "python")

24

serializer: Serialization method for function arguments and return values

25

multiple_outputs: Whether the function returns multiple outputs

26

**kwargs: All DockerOperator parameters are supported

27

28

Returns:

29

TaskDecorator function that creates _DockerDecoratedOperator instances

30

"""

31

```

32

33

**Parameters:**

34

35

- `image`: Docker image containing Python runtime for function execution

36

- `python_command`: Python executable command in container (e.g., "python", "python3", "/opt/python/bin/python")

37

- `serializer`: Method for serializing function arguments and return values:

38

- `"pickle"`: Standard Python pickle (default, fastest)

39

- `"dill"`: Extended pickle with broader object support

40

- `"cloudpickle"`: Cloud-optimized pickle for distributed computing

41

- `multiple_outputs`: Set to True if function returns multiple values as dictionary

42

- `**kwargs`: All DockerOperator parameters (environment, mounts, resources, etc.)

43

44

### Supported Serializers

45

46

```python { .api }

47

# Available serialization options

48

Serializer = Literal["pickle", "dill", "cloudpickle"]

49

50

# Serializer modules (lazy-loaded)

51

_SERIALIZERS: dict[Serializer, Any] = {

52

"pickle": pickle,

53

"dill": dill, # Requires: pip install dill

54

"cloudpickle": cloudpickle # Requires: pip install cloudpickle

55

}

56

```

57

58

## Usage Examples

59

60

### Basic Function Containerization

61

62

```python

63

from airflow.providers.docker.decorators.docker import docker_task

64

65

@docker_task(image='python:3.9-slim')

66

def hello_world():

67

"""Simple containerized function."""

68

return "Hello from Docker container!"

69

70

# Use in DAG

71

hello_task = hello_world()

72

```

73

74

### Function with Arguments

75

76

```python

77

@docker_task(image='python:3.9')

78

def process_data(data_list: list, multiplier: int = 2):

79

"""Process data with arguments."""

80

return [x * multiplier for x in data_list]

81

82

# Call with arguments

83

result = process_data([1, 2, 3, 4], multiplier=3)

84

```

85

86

### Scientific Computing Function

87

88

```python

89

@docker_task(

90

image='python:3.9',

91

serializer='cloudpickle' # Better for scientific objects

92

)

93

def analyze_dataset():

94

"""Perform data analysis using scientific libraries."""

95

import numpy as np

96

import pandas as pd

97

98

# Generate sample data

99

data = np.random.randn(1000, 5)

100

df = pd.DataFrame(data, columns=['A', 'B', 'C', 'D', 'E'])

101

102

# Perform analysis

103

stats = {

104

'mean': df.mean().to_dict(),

105

'std': df.std().to_dict(),

106

'correlation': df.corr().to_dict()

107

}

108

109

return stats

110

111

analysis_task = analyze_dataset()

112

```

113

114

### Function with Custom Environment

115

116

```python

117

@docker_task(

118

image='python:3.9',

119

environment={

120

'API_KEY': '{{ var.value.api_key }}',

121

'LOG_LEVEL': 'DEBUG',

122

'WORKERS': '4'

123

}

124

)

125

def api_data_fetch(endpoint: str):

126

"""Fetch data from API with environment configuration."""

127

import os

128

import requests

129

130

api_key = os.environ['API_KEY']

131

log_level = os.environ.get('LOG_LEVEL', 'INFO')

132

133

response = requests.get(

134

endpoint,

135

headers={'Authorization': f'Bearer {api_key}'}

136

)

137

138

return response.json()

139

140

fetch_task = api_data_fetch('https://api.example.com/data')

141

```

142

143

### Function with Volume Mounts

144

145

```python

146

from docker.types import Mount

147

148

@docker_task(

149

image='python:3.9',

150

mounts=[

151

Mount(

152

source='/host/data',

153

target='/app/data',

154

type='bind',

155

read_only=True

156

),

157

Mount(

158

source='/host/output',

159

target='/app/output',

160

type='bind'

161

)

162

]

163

)

164

def file_processor():

165

"""Process files from mounted volumes."""

166

import os

167

import json

168

169

# Read input files

170

input_dir = '/app/data'

171

output_dir = '/app/output'

172

173

results = []

174

for filename in os.listdir(input_dir):

175

filepath = os.path.join(input_dir, filename)

176

with open(filepath, 'r') as f:

177

data = f.read()

178

results.append({

179

'file': filename,

180

'size': len(data),

181

'lines': len(data.splitlines())

182

})

183

184

# Write results

185

output_file = os.path.join(output_dir, 'results.json')

186

with open(output_file, 'w') as f:

187

json.dump(results, f, indent=2)

188

189

return results

190

191

process_task = file_processor()

192

```

193

194

### Function with Multiple Outputs

195

196

```python

197

@docker_task(

198

image='python:3.9',

199

multiple_outputs=True

200

)

201

def data_pipeline():

202

"""Process data and return multiple outputs."""

203

import random

204

205

# Simulate data processing

206

raw_data = [random.randint(1, 100) for _ in range(50)]

207

208

return {

209

'processed_data': [x * 2 for x in raw_data],

210

'statistics': {

211

'count': len(raw_data),

212

'mean': sum(raw_data) / len(raw_data),

213

'max': max(raw_data),

214

'min': min(raw_data)

215

},

216

'metadata': {

217

'processing_version': '1.0',

218

'timestamp': '2024-01-01T00:00:00Z'

219

}

220

}

221

222

pipeline_task = data_pipeline()

223

224

# Access individual outputs

225

processed = pipeline_task['processed_data']

226

stats = pipeline_task['statistics']

227

meta = pipeline_task['metadata']

228

```

229

230

### GPU-Enabled Function

231

232

```python

233

from docker.types import DeviceRequest

234

235

@docker_task(

236

image='tensorflow/tensorflow:latest-gpu',

237

device_requests=[

238

DeviceRequest(count=1, capabilities=[['gpu']])

239

],

240

serializer='cloudpickle'

241

)

242

def gpu_computation():

243

"""Perform GPU-accelerated computation."""

244

import tensorflow as tf

245

246

# Check GPU availability

247

gpus = tf.config.list_physical_devices('GPU')

248

print(f"Available GPUs: {len(gpus)}")

249

250

# Simple GPU computation

251

with tf.device('/GPU:0'):

252

a = tf.constant([[1.0, 2.0], [3.0, 4.0]])

253

b = tf.constant([[2.0, 1.0], [1.0, 2.0]])

254

result = tf.matmul(a, b)

255

256

return result.numpy().tolist()

257

258

gpu_task = gpu_computation()

259

```

260

261

### Function with Custom Python Environment

262

263

```python

264

@docker_task(

265

image='continuumio/miniconda3:latest',

266

python_command='conda run -n myenv python'

267

)

268

def conda_analysis():

269

"""Run function in conda environment."""

270

import sys

271

import numpy as np

272

import pandas as pd

273

274

# Conda environment info

275

env_info = {

276

'python_version': sys.version,

277

'numpy_version': np.__version__,

278

'pandas_version': pd.__version__

279

}

280

281

return env_info

282

283

conda_task = conda_analysis()

284

```

285

286

### Function with Dill Serialization

287

288

```python

289

@docker_task(

290

image='python:3.9',

291

serializer='dill' # Better support for complex objects

292

)

293

def complex_object_handler():

294

"""Handle complex Python objects with dill."""

295

import functools

296

297

# Create complex objects that pickle can't serialize

298

def multiplier(factor):

299

return lambda x: x * factor

300

301

# Partial functions

302

double = functools.partial(multiplier, 2)

303

triple = functools.partial(multiplier, 3)

304

305

# Nested functions

306

def outer_func():

307

local_var = 42

308

def inner_func():

309

return local_var * 2

310

return inner_func

311

312

nested = outer_func()

313

314

return {

315

'double_result': double(5),

316

'triple_result': triple(5),

317

'nested_result': nested()

318

}

319

320

complex_task = complex_object_handler()

321

```

322

323

## Advanced Configuration

324

325

### Resource Management

326

327

```python

328

@docker_task(

329

image='python:3.9',

330

mem_limit='2g',

331

cpus=2.0,

332

shm_size=268435456 # 256MB shared memory

333

)

334

def memory_intensive_task():

335

"""Function with resource constraints."""

336

import numpy as np

337

338

# Memory-intensive operation

339

large_array = np.random.rand(10000, 10000)

340

result = np.sum(large_array)

341

342

return float(result)

343

344

resource_task = memory_intensive_task()

345

```

346

347

### Network Configuration

348

349

```python

350

@docker_task(

351

image='python:3.9',

352

network_mode='host',

353

extra_hosts={'database': '192.168.1.100'}

354

)

355

def network_service():

356

"""Function with custom networking."""

357

import socket

358

import requests

359

360

# Get container hostname

361

hostname = socket.gethostname()

362

363

# Make network request

364

response = requests.get('http://database:5432/health')

365

366

return {

367

'hostname': hostname,

368

'database_status': response.status_code

369

}

370

371

network_task = network_service()

372

```

373

374

## Decorator Internals

375

376

### _DockerDecoratedOperator

377

378

Internal implementation class (not directly used):

379

380

```python { .api }

381

class _DockerDecoratedOperator(DecoratedOperator, DockerOperator):

382

"""

383

Internal class that combines DecoratedOperator and DockerOperator.

384

385

Handles:

386

- Function serialization and deserialization

387

- Argument passing to containerized function

388

- Return value extraction and XCom storage

389

- Error handling and logging

390

"""

391

```

392

393

## Serialization Considerations

394

395

### Pickle (Default)

396

- **Pros**: Fast, built-in, handles most Python objects

397

- **Cons**: Limited support for complex objects (lambdas, nested functions)

398

- **Use for**: Simple data types, standard library objects

399

400

### Dill

401

- **Pros**: Extended object support, handles lambdas and nested functions

402

- **Cons**: Slower than pickle, requires additional dependency

403

- **Use for**: Complex functions, closures, partial functions

404

405

### CloudPickle

406

- **Pros**: Optimized for distributed computing, cloud environments

407

- **Cons**: Additional dependency, may be slower for simple objects

408

- **Use for**: Scientific computing, distributed workflows, cloud deployments

409

410

## Error Handling

411

412

Containerized functions handle errors through:

413

414

- **Serialization errors**: Function arguments can't be serialized

415

- **Execution errors**: Function fails inside container

416

- **Deserialization errors**: Return value can't be deserialized

417

- **Container errors**: Docker container fails to start or execute

418

419

All errors are propagated as Airflow task failures with detailed logging.