0
# SFTP Hooks
1
2
Core connectivity and file system operations for SFTP servers. The SFTP provider includes both synchronous and asynchronous hooks to support different operational patterns within Airflow workflows.
3
4
## Capabilities
5
6
### Synchronous SFTP Hook
7
8
Main hook for SFTP operations using paramiko, providing comprehensive file system operations and connection management.
9
10
```python { .api }
11
class SFTPHook(SSHHook):
12
"""
13
Interact with SFTP servers using paramiko.
14
15
Inherits from SSHHook and provides SFTP-specific functionality.
16
Supports connection pooling, authentication via SSH keys or passwords,
17
and comprehensive file system operations.
18
"""
19
20
conn_name_attr = "ssh_conn_id"
21
default_conn_name = "sftp_default"
22
conn_type = "sftp"
23
hook_name = "SFTP"
24
25
def __init__(
26
self,
27
ssh_conn_id: str | None = "sftp_default",
28
ssh_hook: SSHHook | None = None,
29
*args,
30
**kwargs,
31
) -> None:
32
"""
33
Initialize SFTP hook.
34
35
Parameters:
36
- ssh_conn_id: Connection ID from Airflow connections
37
- ssh_hook: Optional existing SSH hook (deprecated)
38
"""
39
40
def get_conn(self) -> paramiko.SFTPClient:
41
"""Open an SFTP connection to the remote host."""
42
43
def close_conn(self) -> None:
44
"""Close the SFTP connection."""
45
46
def test_connection(self) -> tuple[bool, str]:
47
"""Test the SFTP connection by calling path with directory."""
48
```
49
50
### UI Configuration
51
52
```python { .api }
53
@classmethod
54
def get_ui_field_behaviour(cls) -> dict[str, Any]:
55
"""
56
Get UI field behavior configuration for Airflow connections.
57
58
Provides configuration for hiding/relabeling connection form fields
59
in the Airflow web UI when creating SFTP connections.
60
61
Returns:
62
Dictionary containing UI field configuration with hidden fields
63
and field relabeling specifications
64
"""
65
```
66
67
### Connection Management
68
69
```python { .api }
70
def get_conn(self) -> paramiko.SFTPClient:
71
"""
72
Open an SFTP connection to the remote host.
73
74
Returns:
75
paramiko.SFTPClient instance for SFTP operations
76
"""
77
78
def close_conn(self) -> None:
79
"""Close the SFTP connection and cleanup resources."""
80
81
def test_connection(self) -> tuple[bool, str]:
82
"""
83
Test the SFTP connection.
84
85
Returns:
86
Tuple of (success: bool, message: str)
87
"""
88
```
89
90
### Directory Operations
91
92
```python { .api }
93
def describe_directory(self, path: str) -> dict[str, dict[str, str | int | None]]:
94
"""
95
Get file information in a directory on the remote system.
96
97
Parameters:
98
- path: Full path to the remote directory
99
100
Returns:
101
Dictionary mapping filenames to their attributes (size, type, modify)
102
"""
103
104
def list_directory(self, path: str) -> list[str]:
105
"""
106
List files in a directory on the remote system.
107
108
Parameters:
109
- path: Full path to the remote directory to list
110
111
Returns:
112
Sorted list of filenames
113
"""
114
115
def mkdir(self, path: str, mode: int = 0o777) -> None:
116
"""
117
Create a directory on the remote system.
118
119
Parameters:
120
- path: Full path to the remote directory to create
121
- mode: Int permissions of octal mode for directory (default: 0o777)
122
"""
123
124
def create_directory(self, path: str, mode: int = 0o777) -> None:
125
"""
126
Create a directory on the remote system with parent directories.
127
128
Creates parent directories if needed and returns silently if target exists.
129
130
Parameters:
131
- path: Full path to the remote directory to create
132
- mode: Int permissions of octal mode for directory (default: 0o777)
133
"""
134
135
def delete_directory(self, path: str) -> None:
136
"""
137
Delete a directory on the remote system.
138
139
Parameters:
140
- path: Full path to the remote directory to delete
141
"""
142
```
143
144
### File Operations
145
146
```python { .api }
147
def retrieve_file(self, remote_full_path: str, local_full_path: str, prefetch: bool = True) -> None:
148
"""
149
Transfer the remote file to a local location.
150
151
Parameters:
152
- remote_full_path: Full path to the remote file
153
- local_full_path: Full path to the local file
154
- prefetch: Controls whether prefetch is performed (default: True)
155
"""
156
157
def store_file(self, remote_full_path: str, local_full_path: str, confirm: bool = True) -> None:
158
"""
159
Transfer a local file to the remote location.
160
161
Parameters:
162
- remote_full_path: Full path to the remote file
163
- local_full_path: Full path to the local file
164
- confirm: Whether to confirm the transfer (default: True)
165
"""
166
167
def delete_file(self, path: str) -> None:
168
"""
169
Remove a file on the server.
170
171
Parameters:
172
- path: Full path to the remote file
173
"""
174
175
def get_mod_time(self, path: str) -> str:
176
"""
177
Get an entry's modification time.
178
179
Parameters:
180
- path: Full path to the remote file
181
182
Returns:
183
Modification time as string in format YYYYMMDDHHMMSS
184
"""
185
```
186
187
### Path Utilities
188
189
```python { .api }
190
def isdir(self, path: str) -> bool:
191
"""
192
Check if the path provided is a directory.
193
194
Parameters:
195
- path: Full path to the remote directory to check
196
197
Returns:
198
True if path is a directory, False otherwise
199
"""
200
201
def isfile(self, path: str) -> bool:
202
"""
203
Check if the path provided is a file.
204
205
Parameters:
206
- path: Full path to the remote file to check
207
208
Returns:
209
True if path is a file, False otherwise
210
"""
211
212
def path_exists(self, path: str) -> bool:
213
"""
214
Whether a remote entity exists.
215
216
Parameters:
217
- path: Full path to the remote file or directory
218
219
Returns:
220
True if path exists, False otherwise
221
"""
222
```
223
224
### Pattern Matching
225
226
```python { .api }
227
def get_file_by_pattern(self, path, fnmatch_pattern) -> str:
228
"""
229
Get the first matching file based on the given fnmatch type pattern.
230
231
Parameters:
232
- path: Path to be checked
233
- fnmatch_pattern: The pattern that will be matched with fnmatch
234
235
Returns:
236
String containing the first found file, or empty string if none matched
237
"""
238
239
def get_files_by_pattern(self, path, fnmatch_pattern) -> list[str]:
240
"""
241
Get all matching files based on the given fnmatch type pattern.
242
243
Parameters:
244
- path: Path to be checked
245
- fnmatch_pattern: The pattern that will be matched with fnmatch
246
247
Returns:
248
List of strings containing the found files, or empty list if none matched
249
"""
250
251
@staticmethod
252
def _is_path_match(path: str, prefix: str | None = None, delimiter: str | None = None) -> bool:
253
"""
254
Whether given path starts with prefix (if set) and ends with delimiter (if set).
255
256
Internal utility method used by get_tree_map for path filtering.
257
258
Parameters:
259
- path: Path to be checked
260
- prefix: If set, path will be checked if starting with prefix
261
- delimiter: If set, path will be checked if ending with delimiter
262
263
Returns:
264
True if path matches criteria, False otherwise
265
"""
266
```
267
268
### Tree Walking
269
270
```python { .api }
271
def walktree(
272
self,
273
path: str,
274
fcallback: Callable[[str], Any | None],
275
dcallback: Callable[[str], Any | None],
276
ucallback: Callable[[str], Any | None],
277
recurse: bool = True,
278
) -> None:
279
"""
280
Recursively descend, depth first, the directory tree at path.
281
282
Calls discrete callback functions for each regular file, directory,
283
and unknown file type.
284
285
Parameters:
286
- path: Root of remote directory to descend, use '.' to start at pwd
287
- fcallback: Callback function to invoke for a regular file
288
- dcallback: Callback function to invoke for a directory
289
- ucallback: Callback function to invoke for an unknown file type
290
- recurse: Should it recurse (default: True)
291
"""
292
293
def get_tree_map(
294
self, path: str, prefix: str | None = None, delimiter: str | None = None
295
) -> tuple[list[str], list[str], list[str]]:
296
"""
297
Get tuple with recursive lists of files, directories and unknown paths.
298
299
Can filter results by giving prefix and/or delimiter parameters.
300
301
Parameters:
302
- path: Path from which tree will be built
303
- prefix: If set, paths will be added if they start with prefix
304
- delimiter: If set, paths will be added if they end with delimiter
305
306
Returns:
307
Tuple with list of files, dirs and unknown items
308
"""
309
```
310
311
### Asynchronous SFTP Hook
312
313
Async hook for SFTP operations using asyncssh, designed for high-performance asynchronous operations.
314
315
```python { .api }
316
class SFTPHookAsync(BaseHook):
317
"""
318
Interact with an SFTP server via asyncssh package.
319
320
Provides asynchronous SFTP operations for improved performance
321
in concurrent scenarios and deferrable operations.
322
"""
323
324
conn_name_attr = "ssh_conn_id"
325
default_conn_name = "sftp_default"
326
conn_type = "sftp"
327
hook_name = "SFTP"
328
default_known_hosts = "~/.ssh/known_hosts"
329
330
def __init__(
331
self,
332
sftp_conn_id: str = default_conn_name,
333
host: str = "",
334
port: int = 22,
335
username: str = "",
336
password: str = "",
337
known_hosts: str = default_known_hosts,
338
key_file: str = "",
339
passphrase: str = "",
340
private_key: str = "",
341
) -> None:
342
"""
343
Initialize async SFTP hook.
344
345
Parameters:
346
- sftp_conn_id: SFTP connection ID to be used for connecting to SFTP server
347
- host: Hostname of the SFTP server
348
- port: Port of the SFTP server (default: 22)
349
- username: Username used when authenticating to the SFTP server
350
- password: Password used when authenticating to the SFTP server
351
- known_hosts: Path to the known_hosts file (default: ~/.ssh/known_hosts)
352
- key_file: Path to the client key file used for authentication
353
- passphrase: Passphrase used with the key_file for authentication
354
- private_key: Private key content as string
355
"""
356
```
357
358
### Asynchronous Operations
359
360
```python { .api }
361
async def list_directory(self, path: str = "") -> list[str] | None:
362
"""
363
Return a list of files on the SFTP server at the provided path.
364
365
Parameters:
366
- path: Path to list (default: current directory)
367
368
Returns:
369
Sorted list of filenames, or None if path doesn't exist
370
"""
371
372
async def read_directory(self, path: str = "") -> Sequence[asyncssh.sftp.SFTPName] | None:
373
"""
374
Return a list of files along with their attributes on the SFTP server.
375
376
Parameters:
377
- path: Path to list (default: current directory)
378
379
Returns:
380
Sequence of SFTPName objects with file attributes, or None if path doesn't exist
381
"""
382
383
async def get_files_and_attrs_by_pattern(
384
self, path: str = "", fnmatch_pattern: str = ""
385
) -> Sequence[asyncssh.sftp.SFTPName]:
386
"""
387
Get the files along with their attributes matching the pattern.
388
389
Parameters:
390
- path: Path to search in
391
- fnmatch_pattern: Pattern to match (e.g., "*.pdf")
392
393
Returns:
394
Sequence of SFTPName objects for files matching the pattern
395
396
Raises:
397
FileNotFoundError: If no files at path found
398
"""
399
400
async def get_mod_time(self, path: str) -> str:
401
"""
402
Get last modified time for the file path.
403
404
Parameters:
405
- path: Full path to the remote file
406
407
Returns:
408
Last modification time as string in format YYYYMMDDHHMMSS
409
410
Raises:
411
AirflowException: If no files matching
412
"""
413
```
414
415
## Usage Examples
416
417
### Basic File Operations
418
419
```python
420
from airflow.providers.sftp.hooks.sftp import SFTPHook
421
422
# Initialize hook
423
hook = SFTPHook(ssh_conn_id='my_sftp_conn')
424
425
# Check if file exists
426
if hook.path_exists('/remote/path/file.txt'):
427
# Download file
428
hook.retrieve_file('/remote/path/file.txt', '/local/path/file.txt')
429
430
# Get file modification time
431
mod_time = hook.get_mod_time('/remote/path/file.txt')
432
print(f"File last modified: {mod_time}")
433
434
# Upload file
435
hook.store_file('/remote/path/new_file.txt', '/local/path/data.txt')
436
437
# Clean up connection
438
hook.close_conn()
439
```
440
441
### Directory Operations
442
443
```python
444
from airflow.providers.sftp.hooks.sftp import SFTPHook
445
446
hook = SFTPHook(ssh_conn_id='my_sftp_conn')
447
448
# List directory contents
449
files = hook.list_directory('/remote/path')
450
print(f"Found {len(files)} files")
451
452
# Get detailed file information
453
file_info = hook.describe_directory('/remote/path')
454
for filename, attrs in file_info.items():
455
print(f"{filename}: {attrs['size']} bytes, {attrs['type']}")
456
457
# Create directory with parents
458
hook.create_directory('/remote/path/new/nested/dir')
459
460
# Find files by pattern
461
csv_files = hook.get_files_by_pattern('/remote/path', '*.csv')
462
print(f"Found CSV files: {csv_files}")
463
```
464
465
### Asynchronous Operations
466
467
```python
468
from airflow.providers.sftp.hooks.sftp import SFTPHookAsync
469
470
async def async_file_operations():
471
hook = SFTPHookAsync(sftp_conn_id='my_sftp_conn')
472
473
# List files asynchronously
474
files = await hook.list_directory('/remote/path')
475
if files:
476
print(f"Found {len(files)} files")
477
478
# Get files matching pattern with attributes
479
pdf_files = await hook.get_files_and_attrs_by_pattern(
480
'/remote/path', '*.pdf'
481
)
482
483
for file in pdf_files:
484
mod_time = await hook.get_mod_time(f'/remote/path/{file.filename}')
485
print(f"{file.filename}: modified {mod_time}")
486
```