or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

api-integration.mddecorators.mdexecutors.mdindex.mdjob-management.mdmonitoring.mdpod-operations.mdresource-management.mdspark-integration.md

decorators.mddocs/

0

# Task Decorators

1

2

Create Kubernetes tasks using Python decorators with automatic pod configuration and execution. Decorators provide a simplified interface for running Python functions and shell commands in Kubernetes pods.

3

4

## Capabilities

5

6

### Kubernetes Task Decorator

7

8

Execute Python functions in Kubernetes pods with automatic serialization, pod management, and result handling.

9

10

```python { .api }

11

def kubernetes_task(

12

image: str,

13

namespace: str = "default",

14

name: str | None = None,

15

random_name_suffix: bool = True,

16

reattach_on_restart: bool = True,

17

startup_timeout_seconds: int = 120,

18

get_logs: bool = True,

19

image_pull_policy: str = "IfNotPresent",

20

cmds: list[str] | None = None,

21

arguments: list[str] | None = None,

22

ports: list | None = None,

23

volume_mounts: list | None = None,

24

volumes: list | None = None,

25

env_vars: list | None = None,

26

secrets: list | None = None,

27

configmaps: list[str] | None = None,

28

labels: dict[str, str] | None = None,

29

node_selector: dict[str, str] | None = None,

30

affinity: dict | None = None,

31

tolerations: list | None = None,

32

security_context: dict | None = None,

33

container_resources: dict | None = None,

34

service_account_name: str | None = None,

35

is_delete_operator_pod: bool = True,

36

hostnetwork: bool = False,

37

pod_template_file: str | None = None,

38

pod_template_dict: dict | None = None,

39

full_pod_spec: dict | None = None,

40

init_containers: list | None = None,

41

sidecars: list | None = None,

42

cluster_context: str | None = None,

43

config_file: str | None = None,

44

in_cluster: bool | None = None,

45

conn_id: str = "kubernetes_default",

46

do_xcom_push: bool = True,

47

task_id: str | None = None,

48

**kwargs

49

):

50

"""

51

Decorator to create Kubernetes task from Python function.

52

53

This decorator converts a Python function into a KubernetesPodOperator task

54

that executes the function inside a Kubernetes pod.

55

56

Args:

57

image (str): Docker image to use for the pod

58

namespace (str): Kubernetes namespace. Defaults to 'default'

59

name (str, optional): Name of the pod. Auto-generated if not provided

60

random_name_suffix (bool): Add random suffix to pod name. Default: True

61

reattach_on_restart (bool): Reattach to existing pod on restart. Default: True

62

startup_timeout_seconds (int): Pod startup timeout. Default: 120

63

get_logs (bool): Retrieve pod logs. Default: True

64

image_pull_policy (str): Image pull policy. Default: 'IfNotPresent'

65

cmds (list[str], optional): Container command override

66

arguments (list[str], optional): Container arguments override

67

ports (list, optional): Container ports to expose

68

volume_mounts (list, optional): Volume mounts for the pod

69

volumes (list, optional): Volumes to attach to the pod

70

env_vars (list, optional): Environment variables

71

secrets (list, optional): Kubernetes secrets to mount

72

configmaps (list[str], optional): ConfigMaps to mount

73

labels (dict[str, str], optional): Pod labels

74

node_selector (dict[str, str], optional): Node selection constraints

75

affinity (dict, optional): Pod affinity rules

76

tolerations (list, optional): Pod tolerations

77

security_context (dict, optional): Security context

78

container_resources (dict, optional): Resource limits and requests

79

service_account_name (str, optional): Service account name

80

is_delete_operator_pod (bool): Delete pod after execution. Default: True

81

hostnetwork (bool): Use host networking. Default: False

82

pod_template_file (str, optional): Path to pod template file

83

pod_template_dict (dict, optional): Pod template as dictionary

84

full_pod_spec (dict, optional): Complete pod specification

85

init_containers (list, optional): Init containers for the pod

86

sidecars (list, optional): Sidecar containers

87

cluster_context (str, optional): Kubernetes cluster context

88

config_file (str, optional): Path to kubeconfig file

89

in_cluster (bool, optional): Use in-cluster configuration

90

conn_id (str): Kubernetes connection ID. Default: 'kubernetes_default'

91

do_xcom_push (bool): Push return value to XCom. Default: True

92

task_id (str, optional): Task ID override

93

**kwargs: Additional arguments passed to the operator

94

95

Returns:

96

Decorated function that creates a KubernetesPodOperator task

97

"""

98

...

99

```

100

101

### Kubernetes Command Task Decorator

102

103

Execute shell commands in Kubernetes pods with simplified command specification and output handling.

104

105

```python { .api }

106

def kubernetes_cmd_task(

107

image: str,

108

cmds: list[str],

109

namespace: str = "default",

110

name: str | None = None,

111

random_name_suffix: bool = True,

112

reattach_on_restart: bool = True,

113

startup_timeout_seconds: int = 120,

114

get_logs: bool = True,

115

image_pull_policy: str = "IfNotPresent",

116

arguments: list[str] | None = None,

117

ports: list | None = None,

118

volume_mounts: list | None = None,

119

volumes: list | None = None,

120

env_vars: list | None = None,

121

secrets: list | None = None,

122

configmaps: list[str] | None = None,

123

labels: dict[str, str] | None = None,

124

node_selector: dict[str, str] | None = None,

125

affinity: dict | None = None,

126

tolerations: list | None = None,

127

security_context: dict | None = None,

128

container_resources: dict | None = None,

129

service_account_name: str | None = None,

130

is_delete_operator_pod: bool = True,

131

hostnetwork: bool = False,

132

pod_template_file: str | None = None,

133

pod_template_dict: dict | None = None,

134

full_pod_spec: dict | None = None,

135

init_containers: list | None = None,

136

sidecars: list | None = None,

137

cluster_context: str | None = None,

138

config_file: str | None = None,

139

in_cluster: bool | None = None,

140

conn_id: str = "kubernetes_default",

141

do_xcom_push: bool = True,

142

task_id: str | None = None,

143

**kwargs

144

):

145

"""

146

Decorator to create Kubernetes command task.

147

148

This decorator creates a KubernetesPodOperator task that executes

149

the specified shell commands inside a Kubernetes pod.

150

151

Args:

152

image (str): Docker image to use for the pod

153

cmds (list[str]): Shell commands to execute

154

namespace (str): Kubernetes namespace. Defaults to 'default'

155

name (str, optional): Name of the pod. Auto-generated if not provided

156

random_name_suffix (bool): Add random suffix to pod name. Default: True

157

reattach_on_restart (bool): Reattach to existing pod on restart. Default: True

158

startup_timeout_seconds (int): Pod startup timeout. Default: 120

159

get_logs (bool): Retrieve pod logs. Default: True

160

image_pull_policy (str): Image pull policy. Default: 'IfNotPresent'

161

arguments (list[str], optional): Arguments for the commands

162

ports (list, optional): Container ports to expose

163

volume_mounts (list, optional): Volume mounts for the pod

164

volumes (list, optional): Volumes to attach to the pod

165

env_vars (list, optional): Environment variables

166

secrets (list, optional): Kubernetes secrets to mount

167

configmaps (list[str], optional): ConfigMaps to mount

168

labels (dict[str, str], optional): Pod labels

169

node_selector (dict[str, str], optional): Node selection constraints

170

affinity (dict, optional): Pod affinity rules

171

tolerations (list, optional): Pod tolerations

172

security_context (dict, optional): Security context

173

container_resources (dict, optional): Resource limits and requests

174

service_account_name (str, optional): Service account name

175

is_delete_operator_pod (bool): Delete pod after execution. Default: True

176

hostnetwork (bool): Use host networking. Default: False

177

pod_template_file (str, optional): Path to pod template file

178

pod_template_dict (dict, optional): Pod template as dictionary

179

full_pod_spec (dict, optional): Complete pod specification

180

init_containers (list, optional): Init containers for the pod

181

sidecars (list, optional): Sidecar containers

182

cluster_context (str, optional): Kubernetes cluster context

183

config_file (str, optional): Path to kubeconfig file

184

in_cluster (bool, optional): Use in-cluster configuration

185

conn_id (str): Kubernetes connection ID. Default: 'kubernetes_default'

186

do_xcom_push (bool): Push return value to XCom. Default: True

187

task_id (str, optional): Task ID override

188

**kwargs: Additional arguments passed to the operator

189

190

Returns:

191

Decorated function that creates a KubernetesPodOperator task

192

"""

193

...

194

```

195

196

### Internal Decorated Operators

197

198

Internal operator classes used by the decorators (not typically used directly).

199

200

```python { .api }

201

class _KubernetesDecoratedOperator(DecoratedOperator, KubernetesPodOperator):

202

"""Internal decorated operator for Kubernetes tasks."""

203

...

204

205

class _KubernetesCmdDecoratedOperator(DecoratedOperator, KubernetesPodOperator):

206

"""Internal decorated operator for Kubernetes command tasks."""

207

...

208

```

209

210

## Usage Examples

211

212

### Basic Python Function Execution

213

214

```python

215

from airflow.providers.cncf.kubernetes.decorators.kubernetes import kubernetes_task

216

217

@kubernetes_task(

218

image='python:3.9-slim',

219

namespace='default'

220

)

221

def process_data():

222

"""Simple data processing function."""

223

import pandas as pd

224

import numpy as np

225

226

# Create sample data

227

data = pd.DataFrame({

228

'values': np.random.randn(1000),

229

'categories': np.random.choice(['A', 'B', 'C'], 1000)

230

})

231

232

# Process data

233

result = data.groupby('categories')['values'].mean().to_dict()

234

print(f"Processing complete: {result}")

235

236

return result

237

238

# Use in DAG

239

result = process_data()

240

```

241

242

### Function with Dependencies and Packages

243

244

```python

245

@kubernetes_task(

246

image='python:3.9',

247

namespace='data-processing',

248

env_vars=[

249

{'name': 'PYTHONPATH', 'value': '/opt/app'},

250

{'name': 'DATA_SOURCE', 'value': 'production'}

251

]

252

)

253

def analyze_data():

254

"""Data analysis with external libraries."""

255

# Install packages at runtime

256

import subprocess

257

import sys

258

259

subprocess.check_call([

260

sys.executable, '-m', 'pip', 'install',

261

'scikit-learn==1.3.0', 'matplotlib==3.7.2'

262

])

263

264

# Now use the packages

265

from sklearn.datasets import make_classification

266

from sklearn.ensemble import RandomForestClassifier

267

from sklearn.model_selection import train_test_split

268

from sklearn.metrics import accuracy_score

269

270

# Generate sample data

271

X, y = make_classification(n_samples=1000, n_features=20, random_state=42)

272

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

273

274

# Train model

275

model = RandomForestClassifier(n_estimators=100, random_state=42)

276

model.fit(X_train, y_train)

277

278

# Evaluate

279

predictions = model.predict(X_test)

280

accuracy = accuracy_score(y_test, predictions)

281

282

print(f"Model accuracy: {accuracy:.4f}")

283

return {'accuracy': accuracy, 'n_samples': len(X)}

284

285

analysis_result = analyze_data()

286

```

287

288

### Function with Volume Mounts

289

290

```python

291

from kubernetes.client import V1Volume, V1VolumeMount, V1PersistentVolumeClaimVolumeSource

292

293

@kubernetes_task(

294

image='python:3.9-slim',

295

namespace='default',

296

volumes=[

297

V1Volume(

298

name='data-volume',

299

persistent_volume_claim=V1PersistentVolumeClaimVolumeSource(

300

claim_name='shared-data-pvc'

301

)

302

)

303

],

304

volume_mounts=[

305

V1VolumeMount(

306

name='data-volume',

307

mount_path='/data'

308

)

309

]

310

)

311

def process_files():

312

"""Process files from mounted volume."""

313

import os

314

import json

315

316

data_dir = '/data'

317

results = []

318

319

# Process all JSON files in the data directory

320

for filename in os.listdir(data_dir):

321

if filename.endswith('.json'):

322

filepath = os.path.join(data_dir, filename)

323

with open(filepath, 'r') as f:

324

data = json.load(f)

325

results.append({

326

'file': filename,

327

'record_count': len(data) if isinstance(data, list) else 1

328

})

329

330

# Write results

331

with open('/data/processing_results.json', 'w') as f:

332

json.dump(results, f, indent=2)

333

334

return {'processed_files': len(results)}

335

336

file_processing = process_files()

337

```

338

339

### Function with Secrets and ConfigMaps

340

341

```python

342

from airflow.providers.cncf.kubernetes.secret import Secret

343

from kubernetes.client import V1EnvVar

344

345

@kubernetes_task(

346

image='python:3.9-slim',

347

namespace='default',

348

secrets=[

349

Secret('env', 'DB_PASSWORD', 'database-secret', 'password'),

350

Secret('env', 'API_KEY', 'api-secret', 'key')

351

],

352

env_vars=[

353

V1EnvVar(name='DB_HOST', value='postgresql.default.svc.cluster.local'),

354

V1EnvVar(name='DB_NAME', value='analytics')

355

],

356

configmaps=['app-config']

357

)

358

def database_operation():

359

"""Perform database operations with secrets."""

360

import os

361

import subprocess

362

import sys

363

364

# Install database client

365

subprocess.check_call([sys.executable, '-m', 'pip', 'install', 'psycopg2-binary'])

366

367

import psycopg2

368

369

# Get credentials from environment (injected from secrets)

370

db_host = os.environ['DB_HOST']

371

db_name = os.environ['DB_NAME']

372

db_password = os.environ['DB_PASSWORD']

373

374

# Connect and query

375

conn = psycopg2.connect(

376

host=db_host,

377

database=db_name,

378

user='analytics_user',

379

password=db_password

380

)

381

382

cursor = conn.cursor()

383

cursor.execute("SELECT COUNT(*) FROM user_events WHERE created_at >= NOW() - INTERVAL '1 day'")

384

daily_events = cursor.fetchone()[0]

385

386

cursor.close()

387

conn.close()

388

389

return {'daily_events': daily_events}

390

391

db_task = database_operation()

392

```

393

394

### Command Task Examples

395

396

```python

397

from airflow.providers.cncf.kubernetes.decorators.kubernetes_cmd import kubernetes_cmd_task

398

399

@kubernetes_cmd_task(

400

image='ubuntu:20.04',

401

cmds=['bash', '-c'],

402

arguments=['echo "Starting data backup" && tar -czf /backup/data-$(date +%Y%m%d).tar.gz /data'],

403

namespace='backups'

404

)

405

def backup_data():

406

"""Simple backup command."""

407

pass

408

409

backup_task = backup_data()

410

```

411

412

### Advanced Command Task with Multiple Steps

413

414

```python

415

@kubernetes_cmd_task(

416

image='alpine:latest',

417

cmds=['sh', '-c'],

418

arguments=['''

419

set -e

420

echo "Installing dependencies..."

421

apk add --no-cache curl jq

422

423

echo "Downloading data..."

424

curl -o /tmp/data.json "https://api.example.com/data"

425

426

echo "Processing data..."

427

cat /tmp/data.json | jq '.results | length'

428

429

echo "Uploading results..."

430

curl -X POST -H "Content-Type: application/json" \\

431

-d @/tmp/data.json \\

432

"https://webhook.example.com/processed"

433

434

echo "Process completed successfully"

435

'''],

436

namespace='default',

437

env_vars=[

438

{'name': 'API_ENDPOINT', 'value': 'https://api.example.com'},

439

{'name': 'WEBHOOK_URL', 'value': 'https://webhook.example.com'}

440

]

441

)

442

def api_data_processor():

443

"""Multi-step API data processing."""

444

pass

445

446

api_task = api_data_processor()

447

```

448

449

### Task with Resource Limits

450

451

```python

452

@kubernetes_task(

453

image='python:3.9',

454

namespace='resource-limited',

455

container_resources={

456

'requests': {

457

'cpu': '100m',

458

'memory': '256Mi'

459

},

460

'limits': {

461

'cpu': '500m',

462

'memory': '1Gi'

463

}

464

}

465

)

466

def resource_intensive_task():

467

"""Task with specific resource requirements."""

468

import time

469

import numpy as np

470

471

# Simulate CPU-intensive work

472

large_array = np.random.randn(10000, 1000)

473

result = np.linalg.svd(large_array)

474

475

print(f"SVD computation completed. Shape: {result[0].shape}")

476

477

# Simulate some processing time

478

time.sleep(10)

479

480

return {'status': 'completed', 'array_size': large_array.shape}

481

482

intensive_task = resource_intensive_task()

483

```

484

485

### Task with Node Selection

486

487

```python

488

@kubernetes_task(

489

image='tensorflow/tensorflow:2.13.0-gpu',

490

namespace='ml-training',

491

node_selector={'accelerator': 'nvidia-gpu'},

492

tolerations=[

493

{

494

'key': 'nvidia.com/gpu',

495

'operator': 'Exists',

496

'effect': 'NoSchedule'

497

}

498

],

499

container_resources={

500

'limits': {

501

'nvidia.com/gpu': '1'

502

}

503

}

504

)

505

def gpu_training_task():

506

"""Machine learning task requiring GPU."""

507

import tensorflow as tf

508

509

# Check GPU availability

510

gpus = tf.config.experimental.list_physical_devices('GPU')

511

print(f"Available GPUs: {len(gpus)}")

512

513

if gpus:

514

# Simple GPU computation

515

with tf.device('/GPU:0'):

516

matrix_a = tf.random.normal([1000, 1000])

517

matrix_b = tf.random.normal([1000, 1000])

518

result = tf.matmul(matrix_a, matrix_b)

519

520

print(f"GPU computation completed. Result shape: {result.shape}")

521

return {'gpu_used': True, 'result_shape': str(result.shape)}

522

else:

523

print("No GPU available, using CPU")

524

return {'gpu_used': False}

525

526

gpu_task = gpu_training_task()

527

```

528

529

### DAG with Multiple Decorated Tasks

530

531

```python

532

from airflow import DAG

533

from datetime import datetime

534

535

dag = DAG(

536

'kubernetes_decorated_workflow',

537

start_date=datetime(2023, 1, 1),

538

schedule_interval='@daily',

539

catchup=False

540

)

541

542

@kubernetes_task(

543

image='python:3.9-slim',

544

namespace='data-pipeline',

545

dag=dag

546

)

547

def extract_data():

548

"""Extract data from source."""

549

import random

550

import json

551

552

# Simulate data extraction

553

data = [

554

{'id': i, 'value': random.randint(1, 100), 'category': random.choice(['A', 'B', 'C'])}

555

for i in range(1000)

556

]

557

558

print(f"Extracted {len(data)} records")

559

return data

560

561

@kubernetes_task(

562

image='python:3.9-slim',

563

namespace='data-pipeline',

564

dag=dag

565

)

566

def transform_data(raw_data):

567

"""Transform extracted data."""

568

import statistics

569

570

# Group by category and calculate statistics

571

categories = {}

572

for record in raw_data:

573

cat = record['category']

574

if cat not in categories:

575

categories[cat] = []

576

categories[cat].append(record['value'])

577

578

# Calculate statistics for each category

579

stats = {}

580

for cat, values in categories.items():

581

stats[cat] = {

582

'count': len(values),

583

'mean': statistics.mean(values),

584

'median': statistics.median(values),

585

'min': min(values),

586

'max': max(values)

587

}

588

589

print(f"Transformed data for {len(stats)} categories")

590

return stats

591

592

@kubernetes_cmd_task(

593

image='alpine:latest',

594

cmds=['sh', '-c'],

595

arguments=['echo "Loading data..." && sleep 5 && echo "Data loaded successfully"'],

596

namespace='data-pipeline',

597

dag=dag

598

)

599

def load_data():

600

"""Load processed data."""

601

pass

602

603

# Set up task dependencies

604

raw_data = extract_data()

605

processed_data = transform_data(raw_data)

606

load_task = load_data()

607

608

# Dependencies

609

raw_data >> processed_data >> load_task

610

```