or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

decorators.mdhooks.mdindex.mdoperators.mdsensors.mdtriggers.md

operators.mddocs/

0

# File Transfer Operations

1

2

Task execution components for uploading and downloading files between local and remote SFTP locations. The SFTP operator provides robust file transfer capabilities with support for batch operations, intermediate directory creation, and comprehensive error handling.

3

4

## Capabilities

5

6

### SFTP Operation Constants

7

8

Operation type constants for specifying the direction of file transfers.

9

10

```python { .api }

11

class SFTPOperation:

12

"""Operation that can be used with SFTP."""

13

14

PUT = "put" # Upload operation from local to remote

15

GET = "get" # Download operation from remote to local

16

```

17

18

### SFTP Operator

19

20

Main operator for transferring files between local and remote SFTP locations.

21

22

```python { .api }

23

class SFTPOperator(BaseOperator):

24

"""

25

SFTPOperator for transferring files from remote host to local or vice versa.

26

27

This operator uses SFTP hook to open SFTP transport channel that serves as

28

basis for file transfer operations. Supports both single file and batch

29

file transfers with comprehensive configuration options.

30

"""

31

32

template_fields: Sequence[str] = ("local_filepath", "remote_filepath", "remote_host")

33

34

def __init__(

35

self,

36

*,

37

ssh_hook: SSHHook | None = None,

38

sftp_hook: SFTPHook | None = None,

39

ssh_conn_id: str | None = None,

40

remote_host: str | None = None,

41

local_filepath: str | list[str],

42

remote_filepath: str | list[str],

43

operation: str = SFTPOperation.PUT,

44

confirm: bool = True,

45

create_intermediate_dirs: bool = False,

46

**kwargs,

47

) -> None:

48

"""

49

Initialize SFTP operator.

50

51

Parameters:

52

- ssh_conn_id: SSH connection ID from Airflow connections

53

- sftp_hook: Predefined SFTPHook to use (preferred over ssh_conn_id)

54

- ssh_hook: Deprecated - predefined SSHHook to use (use sftp_hook instead)

55

- remote_host: Remote host to connect (templated)

56

- local_filepath: Local file path or list of paths to get or put (templated)

57

- remote_filepath: Remote file path or list of paths to get or put (templated)

58

- operation: Specify operation 'get' or 'put' (default: put)

59

- confirm: Specify if SFTP operation should be confirmed (default: True)

60

- create_intermediate_dirs: Create missing intermediate directories (default: False)

61

"""

62

63

def execute(self, context: Any) -> str | list[str] | None:

64

"""Execute the file transfer operation."""

65

66

def get_openlineage_facets_on_start(self):

67

"""Return OpenLineage datasets for lineage tracking."""

68

```

69

70

### File Transfer Execution

71

72

```python { .api }

73

def execute(self, context: Any) -> str | list[str] | None:

74

"""

75

Execute the file transfer operation (PUT or GET).

76

77

Validates file path arrays, establishes SFTP connection, and performs

78

the specified transfer operation with optional intermediate directory creation.

79

80

Parameters:

81

- context: Airflow task execution context

82

83

Returns:

84

Filepath or list of filepaths that were transferred, or None

85

86

Raises:

87

ValueError: If local_filepath and remote_filepath arrays have different lengths

88

TypeError: If operation is not 'get' or 'put'

89

AirflowException: If both ssh_hook and sftp_hook are defined, or transfer fails

90

"""

91

```

92

93

### Data Lineage Integration

94

95

```python { .api }

96

def get_openlineage_facets_on_start(self):

97

"""

98

Return OpenLineage datasets for data lineage tracking.

99

100

Creates dataset facets for both local and remote file locations

101

to enable lineage tracking in OpenLineage-compatible systems.

102

103

Returns:

104

OpenLineage facets containing input and output datasets

105

"""

106

```

107

108

## Usage Examples

109

110

### Basic File Upload

111

112

```python

113

from airflow import DAG

114

from airflow.providers.sftp.operators.sftp import SFTPOperator, SFTPOperation

115

from datetime import datetime

116

117

dag = DAG(

118

'sftp_upload_example',

119

start_date=datetime(2023, 1, 1),

120

schedule_interval=None

121

)

122

123

upload_task = SFTPOperator(

124

task_id='upload_file',

125

ssh_conn_id='sftp_default',

126

local_filepath='/local/data/report.csv',

127

remote_filepath='/remote/uploads/report.csv',

128

operation=SFTPOperation.PUT,

129

dag=dag

130

)

131

```

132

133

### Basic File Download

134

135

```python

136

from airflow import DAG

137

from airflow.providers.sftp.operators.sftp import SFTPOperator, SFTPOperation

138

from datetime import datetime

139

140

dag = DAG(

141

'sftp_download_example',

142

start_date=datetime(2023, 1, 1),

143

schedule_interval=None

144

)

145

146

download_task = SFTPOperator(

147

task_id='download_file',

148

ssh_conn_id='sftp_default',

149

local_filepath='/local/downloads/data.csv',

150

remote_filepath='/remote/exports/data.csv',

151

operation=SFTPOperation.GET,

152

dag=dag

153

)

154

```

155

156

### Batch File Transfer

157

158

```python

159

from airflow import DAG

160

from airflow.providers.sftp.operators.sftp import SFTPOperator, SFTPOperation

161

from datetime import datetime, timedelta

162

163

dag = DAG(

164

'sftp_batch_example',

165

start_date=datetime(2023, 1, 1),

166

schedule_interval=timedelta(days=1)

167

)

168

169

# Upload multiple files

170

batch_upload = SFTPOperator(

171

task_id='batch_upload',

172

ssh_conn_id='sftp_default',

173

local_filepath=[

174

'/local/data/file1.csv',

175

'/local/data/file2.csv',

176

'/local/data/file3.csv'

177

],

178

remote_filepath=[

179

'/remote/uploads/file1.csv',

180

'/remote/uploads/file2.csv',

181

'/remote/uploads/file3.csv'

182

],

183

operation=SFTPOperation.PUT,

184

create_intermediate_dirs=True, # Create /remote/uploads/ if it doesn't exist

185

dag=dag

186

)

187

188

# Download multiple files

189

batch_download = SFTPOperator(

190

task_id='batch_download',

191

ssh_conn_id='sftp_default',

192

local_filepath=[

193

'/local/downloads/result1.json',

194

'/local/downloads/result2.json'

195

],

196

remote_filepath=[

197

'/remote/results/result1.json',

198

'/remote/results/result2.json'

199

],

200

operation=SFTPOperation.GET,

201

dag=dag

202

)

203

```

204

205

### Advanced Configuration

206

207

```python

208

from airflow import DAG

209

from airflow.providers.sftp.operators.sftp import SFTPOperator, SFTPOperation

210

from airflow.providers.sftp.hooks.sftp import SFTPHook

211

from datetime import datetime

212

213

dag = DAG(

214

'sftp_advanced_example',

215

start_date=datetime(2023, 1, 1),

216

schedule_interval=None

217

)

218

219

# Using predefined hook for custom configuration

220

custom_hook = SFTPHook(ssh_conn_id='sftp_custom')

221

222

advanced_transfer = SFTPOperator(

223

task_id='advanced_transfer',

224

sftp_hook=custom_hook, # Use predefined hook

225

remote_host='custom.sftp.server.com', # Override connection host

226

local_filepath='/local/data/{{ ds }}/report.csv', # Templated path

227

remote_filepath='/remote/daily/{{ ds }}/report.csv', # Templated path

228

operation=SFTPOperation.PUT,

229

confirm=True, # Confirm successful transfer

230

create_intermediate_dirs=True, # Create date-based directories

231

dag=dag

232

)

233

```

234

235

### Error Handling and Retries

236

237

```python

238

from airflow import DAG

239

from airflow.providers.sftp.operators.sftp import SFTPOperator, SFTPOperation

240

from datetime import datetime, timedelta

241

242

default_args = {

243

'retries': 3,

244

'retry_delay': timedelta(minutes=5),

245

'retry_exponential_backoff': True,

246

'max_retry_delay': timedelta(minutes=30)

247

}

248

249

dag = DAG(

250

'sftp_resilient_example',

251

default_args=default_args,

252

start_date=datetime(2023, 1, 1),

253

schedule_interval=timedelta(hours=6)

254

)

255

256

resilient_transfer = SFTPOperator(

257

task_id='resilient_transfer',

258

ssh_conn_id='sftp_prod',

259

local_filepath='/local/critical/data.parquet',

260

remote_filepath='/remote/warehouse/data.parquet',

261

operation=SFTPOperation.PUT,

262

create_intermediate_dirs=True,

263

# Operator will automatically retry on connection or transfer failures

264

dag=dag

265

)

266

```

267

268

### Integration with Sensors

269

270

```python

271

from airflow import DAG

272

from airflow.providers.sftp.operators.sftp import SFTPOperator, SFTPOperation

273

from airflow.providers.sftp.sensors.sftp import SFTPSensor

274

from datetime import datetime, timedelta

275

276

dag = DAG(

277

'sftp_sensor_integration',

278

start_date=datetime(2023, 1, 1),

279

schedule_interval=timedelta(hours=1)

280

)

281

282

# Wait for source file to appear

283

wait_for_file = SFTPSensor(

284

task_id='wait_for_source',

285

path='/remote/incoming/data.csv',

286

sftp_conn_id='sftp_source',

287

timeout=3600, # Wait up to 1 hour

288

poke_interval=300, # Check every 5 minutes

289

dag=dag

290

)

291

292

# Download the file once it's available

293

download_file = SFTPOperator(

294

task_id='download_data',

295

ssh_conn_id='sftp_source',

296

local_filepath='/local/staging/data.csv',

297

remote_filepath='/remote/incoming/data.csv',

298

operation=SFTPOperation.GET,

299

dag=dag

300

)

301

302

# Upload processed file to different server

303

upload_processed = SFTPOperator(

304

task_id='upload_processed',

305

ssh_conn_id='sftp_destination',

306

local_filepath='/local/processed/data.csv',

307

remote_filepath='/remote/processed/data.csv',

308

operation=SFTPOperation.PUT,

309

create_intermediate_dirs=True,

310

dag=dag

311

)

312

313

wait_for_file >> download_file >> upload_processed

314

```

315

316

### Dynamic File Paths with Templating

317

318

```python

319

from airflow import DAG

320

from airflow.providers.sftp.operators.sftp import SFTPOperator, SFTPOperation

321

from datetime import datetime, timedelta

322

323

dag = DAG(

324

'sftp_templating_example',

325

start_date=datetime(2023, 1, 1),

326

schedule_interval=timedelta(days=1)

327

)

328

329

templated_transfer = SFTPOperator(

330

task_id='templated_transfer',

331

ssh_conn_id='sftp_default',

332

# Use Airflow templating for dynamic paths

333

local_filepath='/local/data/{{ ds }}/export_{{ ts_nodash }}.csv',

334

remote_filepath='/remote/daily/{{ ds }}/export_{{ ts_nodash }}.csv',

335

operation=SFTPOperation.PUT,

336

create_intermediate_dirs=True,

337

dag=dag

338

)

339

```

340

341

## Best Practices

342

343

### Connection Management

344

345

- Use connection pooling through Airflow's connection management

346

- Prefer `sftp_hook` parameter over deprecated `ssh_hook`

347

- Use connection IDs consistently across tasks

348

- Configure timeouts appropriately for large file transfers

349

350

### File Path Handling

351

352

- Use absolute paths for both local and remote file paths

353

- Enable `create_intermediate_dirs=True` for dynamic directory structures

354

- Validate file path arrays have matching lengths for batch operations

355

- Use templating for date-based or dynamic file naming

356

357

### Error Handling

358

359

- Configure appropriate retry policies for network-dependent operations

360

- Use `confirm=True` to verify successful transfers

361

- Implement downstream validation of transferred files

362

- Monitor transfer logs for performance optimization opportunities

363

364

### Performance Optimization

365

366

- Use batch operations for multiple files instead of individual tasks

367

- Consider file size limitations for single transfers

368

- Implement prefetching controls for large file downloads

369

- Use async hooks for I/O intensive workflows where appropriate