or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

decorators.mdhooks.mdindex.mdoperators.mdsensors.mdtriggers.md

decorators.mddocs/

0

# Task Decorators

1

2

Simplified interfaces for creating SFTP-based tasks using Python decorators. The SFTP provider includes task decorators that enable more readable and maintainable DAG definitions for common SFTP operations, particularly file monitoring scenarios.

3

4

## Capabilities

5

6

### SFTP Sensor Task Decorator

7

8

Task decorator for creating SFTP sensor tasks with simplified syntax and enhanced functionality.

9

10

```python { .api }

11

def sftp_sensor_task(python_callable: Callable | None = None, **kwargs) -> TaskDecorator:

12

"""

13

Wrap a function into an Airflow SFTP sensor operator.

14

15

Creates a decorated task that combines SFTP file monitoring with custom

16

Python processing logic. The decorated function receives files_found

17

in its keyword arguments when files are detected.

18

19

Parameters:

20

- python_callable: Function to decorate and execute when files are found

21

- **kwargs: Additional arguments passed to the underlying SFTPSensor

22

23

Returns:

24

TaskDecorator that creates _DecoratedSFTPSensor instances

25

"""

26

```

27

28

### Decorated SFTP Sensor Class

29

30

Internal implementation class for decorated SFTP sensor tasks.

31

32

```python { .api }

33

class _DecoratedSFTPSensor(SFTPSensor):

34

"""

35

Wraps a Python callable and captures args/kwargs when called for execution.

36

37

Combines SFTP file monitoring capabilities with custom Python processing.

38

Inherits all SFTPSensor functionality while adding decorator-specific

39

handling for Python callable execution.

40

"""

41

42

template_fields: Sequence[str] = ("op_args", "op_kwargs", *SFTPSensor.template_fields)

43

custom_operator_name = "@task.sftp_sensor"

44

shallow_copy_attrs: Sequence[str] = ("python_callable",)

45

46

def __init__(

47

self,

48

*,

49

task_id: str,

50

**kwargs,

51

) -> None:

52

"""

53

Initialize decorated SFTP sensor.

54

55

Parameters:

56

- task_id: Unique task identifier

57

- **kwargs: Arguments passed to parent SFTPSensor class

58

"""

59

```

60

61

## Usage Examples

62

63

### Basic SFTP Sensor Decorator

64

65

```python

66

from airflow import DAG

67

from airflow.providers.sftp.decorators.sensors.sftp import sftp_sensor_task

68

from datetime import datetime, timedelta

69

70

dag = DAG(

71

'sftp_decorator_basic',

72

start_date=datetime(2023, 1, 1),

73

schedule_interval=timedelta(hours=1)

74

)

75

76

@sftp_sensor_task(

77

path='/remote/data',

78

file_pattern='*.csv',

79

sftp_conn_id='sftp_default',

80

dag=dag

81

)

82

def process_csv_files(files_found, **context):

83

"""Process CSV files when they are found."""

84

print(f"Found {len(files_found)} CSV files: {files_found}")

85

86

# Custom processing logic

87

processed_files = []

88

for file_path in files_found:

89

print(f"Processing file: {file_path}")

90

# Add your file processing logic here

91

processed_files.append(f"processed_{file_path}")

92

93

return {

94

"status": "success",

95

"processed_count": len(processed_files),

96

"processed_files": processed_files

97

}

98

```

99

100

### Advanced File Processing with Decorator

101

102

```python

103

from airflow import DAG

104

from airflow.providers.sftp.decorators.sensors.sftp import sftp_sensor_task

105

from airflow.providers.sftp.hooks.sftp import SFTPHook

106

from datetime import datetime, timedelta

107

import json

108

109

dag = DAG(

110

'sftp_decorator_advanced',

111

start_date=datetime(2023, 1, 1),

112

schedule_interval=timedelta(days=1)

113

)

114

115

@sftp_sensor_task(

116

path='/remote/daily_exports',

117

file_pattern='export_{{ ds_nodash }}_*.json',

118

newer_than='{{ ds }}T00:00:00',

119

sftp_conn_id='sftp_default',

120

timeout=7200, # 2 hours

121

poke_interval=300, # 5 minutes

122

dag=dag

123

)

124

def validate_and_download_exports(files_found, **context):

125

"""Validate JSON exports and download them for processing."""

126

hook = SFTPHook(ssh_conn_id='sftp_default')

127

128

validated_files = []

129

invalid_files = []

130

131

for file_path in files_found:

132

try:

133

# Get file size for validation

134

file_info = hook.describe_directory(file_path.rsplit('/', 1)[0])

135

filename = file_path.rsplit('/', 1)[1]

136

137

if filename in file_info:

138

file_size = file_info[filename]['size']

139

if file_size > 100: # Minimum size check

140

validated_files.append(file_path)

141

print(f"Valid file: {file_path} ({file_size} bytes)")

142

else:

143

invalid_files.append(file_path)

144

print(f"Invalid file (too small): {file_path} ({file_size} bytes)")

145

146

except Exception as e:

147

print(f"Error validating {file_path}: {e}")

148

invalid_files.append(file_path)

149

150

hook.close_conn()

151

152

return {

153

"valid_files": validated_files,

154

"invalid_files": invalid_files,

155

"validation_summary": {

156

"total_found": len(files_found),

157

"valid_count": len(validated_files),

158

"invalid_count": len(invalid_files)

159

}

160

}

161

```

162

163

### Deferrable Sensor with Custom Processing

164

165

```python

166

from airflow import DAG

167

from airflow.providers.sftp.decorators.sensors.sftp import sftp_sensor_task

168

from datetime import datetime, timedelta

169

170

dag = DAG(

171

'sftp_decorator_deferrable',

172

start_date=datetime(2023, 1, 1),

173

schedule_interval=timedelta(hours=2),

174

max_active_runs=5

175

)

176

177

@sftp_sensor_task(

178

path='/remote/realtime_data',

179

file_pattern='sensor_data_*.parquet',

180

newer_than='{{ ts }}', # Only files newer than task execution time

181

sftp_conn_id='sftp_realtime',

182

deferrable=True, # Use async trigger for resource efficiency

183

timeout=3600,

184

dag=dag

185

)

186

def process_sensor_data(files_found, **context):

187

"""Process real-time sensor data files."""

188

execution_date = context['ds']

189

task_instance = context['task_instance']

190

191

print(f"Processing sensor data for {execution_date}")

192

print(f"Found {len(files_found)} files: {files_found}")

193

194

# Simulate processing logic

195

processing_results = []

196

for file_path in files_found:

197

# Extract timestamp from filename

198

filename = file_path.split('/')[-1]

199

if 'sensor_data_' in filename:

200

timestamp = filename.replace('sensor_data_', '').replace('.parquet', '')

201

processing_results.append({

202

"file": file_path,

203

"timestamp": timestamp,

204

"status": "processed"

205

})

206

207

# Push results to XCom for downstream tasks

208

task_instance.xcom_push(key='processing_results', value=processing_results)

209

210

return {

211

"execution_date": execution_date,

212

"files_processed": len(processing_results),

213

"processing_results": processing_results

214

}

215

```

216

217

### Multiple Pattern Monitoring with Decorator

218

219

```python

220

from airflow import DAG

221

from airflow.providers.sftp.decorators.sensors.sftp import sftp_sensor_task

222

from datetime import datetime, timedelta

223

224

dag = DAG(

225

'sftp_decorator_patterns',

226

start_date=datetime(2023, 1, 1),

227

schedule_interval=timedelta(hours=4)

228

)

229

230

@sftp_sensor_task(

231

path='/remote/mixed_data',

232

file_pattern='*', # Monitor all files

233

sftp_conn_id='sftp_default',

234

dag=dag

235

)

236

def categorize_files(files_found, **context):

237

"""Categorize found files by type and process accordingly."""

238

239

categorized = {

240

'csv_files': [],

241

'json_files': [],

242

'xml_files': [],

243

'other_files': []

244

}

245

246

for file_path in files_found:

247

filename = file_path.lower()

248

if filename.endswith('.csv'):

249

categorized['csv_files'].append(file_path)

250

elif filename.endswith('.json'):

251

categorized['json_files'].append(file_path)

252

elif filename.endswith('.xml'):

253

categorized['xml_files'].append(file_path)

254

else:

255

categorized['other_files'].append(file_path)

256

257

# Log categorization results

258

for category, files in categorized.items():

259

if files:

260

print(f"{category}: {len(files)} files")

261

for file in files:

262

print(f" - {file}")

263

264

return categorized

265

```

266

267

### Error Handling in Decorated Tasks

268

269

```python

270

from airflow import DAG

271

from airflow.providers.sftp.decorators.sensors.sftp import sftp_sensor_task

272

from airflow.exceptions import AirflowException

273

from datetime import datetime, timedelta

274

275

dag = DAG(

276

'sftp_decorator_error_handling',

277

start_date=datetime(2023, 1, 1),

278

schedule_interval=timedelta(hours=6)

279

)

280

281

@sftp_sensor_task(

282

path='/remote/critical_files',

283

file_pattern='critical_*.txt',

284

newer_than='{{ ds }}T06:00:00',

285

sftp_conn_id='sftp_critical',

286

timeout=7200,

287

retries=2,

288

retry_delay=timedelta(minutes=15),

289

dag=dag

290

)

291

def process_critical_files(files_found, **context):

292

"""Process critical files with comprehensive error handling."""

293

294

if not files_found:

295

raise AirflowException("No critical files found - this should not happen")

296

297

try:

298

processed_files = []

299

failed_files = []

300

301

for file_path in files_found:

302

try:

303

# Simulate file processing

304

print(f"Processing critical file: {file_path}")

305

306

# Add your critical file processing logic here

307

# For example: data validation, format checking, etc.

308

309

# Simulate processing success/failure

310

if "invalid" not in file_path.lower():

311

processed_files.append(file_path)

312

print(f"Successfully processed: {file_path}")

313

else:

314

failed_files.append(file_path)

315

print(f"Processing failed: {file_path}")

316

317

except Exception as e:

318

failed_files.append(file_path)

319

print(f"Error processing {file_path}: {e}")

320

321

# Check if any critical files failed

322

if failed_files:

323

error_msg = f"Failed to process {len(failed_files)} critical files: {failed_files}"

324

print(error_msg)

325

# Decide whether to fail the task or just warn

326

if len(failed_files) > len(processed_files):

327

raise AirflowException(error_msg)

328

329

return {

330

"total_files": len(files_found),

331

"processed_files": processed_files,

332

"failed_files": failed_files,

333

"success_rate": len(processed_files) / len(files_found) * 100

334

}

335

336

except Exception as e:

337

print(f"Critical error in file processing: {e}")

338

raise AirflowException(f"Critical file processing failed: {e}")

339

```

340

341

### Integration with Downstream Tasks

342

343

```python

344

from airflow import DAG

345

from airflow.providers.sftp.decorators.sensors.sftp import sftp_sensor_task

346

from airflow.operators.python import PythonOperator

347

from datetime import datetime, timedelta

348

349

def process_downstream(**context):

350

"""Process results from decorated SFTP sensor."""

351

# Pull results from the decorated sensor

352

sensor_results = context['task_instance'].xcom_pull(task_ids='monitor_data_files')

353

354

print(f"Received sensor results: {sensor_results}")

355

356

if sensor_results and 'files_found' in sensor_results:

357

files = sensor_results['files_found']

358

print(f"Processing {len(files)} files downstream")

359

360

# Add downstream processing logic

361

for file_path in files:

362

print(f"Downstream processing: {file_path}")

363

364

return "Downstream processing complete"

365

366

dag = DAG(

367

'sftp_decorator_integration',

368

start_date=datetime(2023, 1, 1),

369

schedule_interval=timedelta(hours=3)

370

)

371

372

# Decorated sensor task

373

@sftp_sensor_task(

374

task_id='monitor_data_files',

375

path='/remote/data_pipeline',

376

file_pattern='pipeline_*.csv',

377

sftp_conn_id='sftp_default',

378

dag=dag

379

)

380

def monitor_and_validate(files_found, **context):

381

"""Monitor files and perform initial validation."""

382

validated_files = []

383

384

for file_path in files_found:

385

# Perform validation logic

386

if file_path.endswith('.csv'):

387

validated_files.append(file_path)

388

print(f"Validated: {file_path}")

389

390

return {

391

"files_found": files_found,

392

"validated_files": validated_files,

393

"validation_count": len(validated_files)

394

}

395

396

# Downstream processing task

397

downstream_task = PythonOperator(

398

task_id='downstream_processing',

399

python_callable=process_downstream,

400

dag=dag

401

)

402

403

# Set up task dependencies

404

monitor_and_validate >> downstream_task

405

```

406

407

### Template Usage in Decorators

408

409

```python

410

from airflow import DAG

411

from airflow.providers.sftp.decorators.sensors.sftp import sftp_sensor_task

412

from datetime import datetime, timedelta

413

414

dag = DAG(

415

'sftp_decorator_templating',

416

start_date=datetime(2023, 1, 1),

417

schedule_interval=timedelta(days=1)

418

)

419

420

@sftp_sensor_task(

421

path='/remote/daily/{{ ds }}', # Templated path

422

file_pattern='data_{{ ds_nodash }}_*.json', # Templated pattern

423

newer_than='{{ ds }}T05:00:00', # Templated time

424

sftp_conn_id='sftp_default',

425

timeout=14400,

426

dag=dag

427

)

428

def process_daily_data(files_found, **context):

429

"""Process daily data files using Airflow templating."""

430

execution_date = context['ds']

431

formatted_date = context['ds_nodash']

432

433

print(f"Processing daily data for {execution_date}")

434

print(f"Looking for pattern: data_{formatted_date}_*.json")

435

print(f"Found {len(files_found)} files")

436

437

daily_summary = {

438

"execution_date": execution_date,

439

"formatted_date": formatted_date,

440

"files_found": files_found,

441

"file_count": len(files_found)

442

}

443

444

# Process each file

445

for file_path in files_found:

446

print(f"Processing daily file: {file_path}")

447

# Add daily file processing logic

448

449

return daily_summary

450

```

451

452

## Decorator Benefits

453

454

### Simplified Syntax

455

456

- Combines sensor logic with custom processing in a single function

457

- Reduces boilerplate code compared to separate sensor and processing tasks

458

- Provides cleaner DAG definitions with decorator syntax

459

- Enables direct access to found files in the decorated function

460

461

### Enhanced Functionality

462

463

- Automatic handling of `files_found` parameter injection

464

- Seamless integration with Airflow's templating system

465

- Built-in XCom handling for downstream task communication

466

- Support for all SFTPSensor parameters and configurations

467

468

### Improved Maintainability

469

470

- Co-locates sensor configuration with processing logic

471

- Reduces task dependencies and complex XCom passing

472

- Provides clear function signatures for custom processing

473

- Enables better code organization and reusability

474

475

## Best Practices

476

477

### Function Design

478

479

- Keep decorated functions focused on processing found files

480

- Use descriptive function names that indicate the processing purpose

481

- Document function parameters and return values clearly

482

- Handle edge cases like empty file lists gracefully

483

484

### Error Handling

485

486

- Implement proper exception handling within decorated functions

487

- Use Airflow exceptions for task failures that should stop the pipeline

488

- Log processing steps for debugging and monitoring

489

- Consider partial failure scenarios for batch file processing

490

491

### Performance Considerations

492

493

- Avoid heavy processing within the decorated function for large file sets

494

- Consider using the decorator for coordination and separate tasks for processing

495

- Use appropriate timeout values for sensor configuration

496

- Monitor memory usage when processing file metadata

497

498

### Integration Patterns

499

500

- Use return values to pass results to downstream tasks via XCom

501

- Implement consistent return value structures across decorated tasks

502

- Consider using the decorator for validation and coordination logic

503

- Combine with other Airflow operators for complex workflows