SSH File Transfer Protocol (SFTP) provider for Apache Airflow with hooks, operators, sensors, triggers, and decorators for secure file operations
npx @tessl/cli install tessl/pypi-apache-airflow-providers-sftp@4.11.00
# Apache Airflow SFTP Provider
1
2
A comprehensive Apache Airflow provider package for SSH File Transfer Protocol (SFTP) operations. This provider enables secure file transfers and remote file system operations within Airflow workflows, offering hooks for connectivity, operators for file transfer tasks, sensors for monitoring file presence, triggers for asynchronous operations, and decorators for simplified task creation.
3
4
## Package Information
5
6
- **Package Name**: apache-airflow-providers-sftp
7
- **Language**: Python
8
- **Installation**: `pip install apache-airflow-providers-sftp`
9
- **Dependencies**: `apache-airflow>=2.8.0`, `apache-airflow-providers-ssh>=2.1.0`, `paramiko>=2.9.0`, `asyncssh>=2.12.0`
10
11
## Core Imports
12
13
```python
14
from airflow.providers.sftp.hooks.sftp import SFTPHook, SFTPHookAsync
15
from airflow.providers.sftp.operators.sftp import SFTPOperator, SFTPOperation
16
from airflow.providers.sftp.sensors.sftp import SFTPSensor
17
from airflow.providers.sftp.triggers.sftp import SFTPTrigger
18
from airflow.providers.sftp.decorators.sensors.sftp import sftp_sensor_task
19
```
20
21
## Basic Usage
22
23
```python
24
from airflow import DAG
25
from airflow.providers.sftp.operators.sftp import SFTPOperator, SFTPOperation
26
from airflow.providers.sftp.sensors.sftp import SFTPSensor
27
from datetime import datetime, timedelta
28
29
# Create DAG instance
30
dag = DAG(
31
'sftp_example',
32
default_args={'retries': 1},
33
schedule_interval=timedelta(hours=1),
34
start_date=datetime(2023, 1, 1)
35
)
36
37
# Monitor for file existence
38
sensor = SFTPSensor(
39
task_id='check_file_exists',
40
path='/remote/path/data.csv',
41
sftp_conn_id='sftp_default',
42
dag=dag
43
)
44
45
# Download file from SFTP server
46
download = SFTPOperator(
47
task_id='download_file',
48
ssh_conn_id='sftp_default',
49
local_filepath='/local/path/data.csv',
50
remote_filepath='/remote/path/data.csv',
51
operation=SFTPOperation.GET,
52
dag=dag
53
)
54
55
# Upload file to SFTP server
56
upload = SFTPOperator(
57
task_id='upload_file',
58
ssh_conn_id='sftp_default',
59
local_filepath='/local/path/processed_data.csv',
60
remote_filepath='/remote/path/processed_data.csv',
61
operation=SFTPOperation.PUT,
62
create_intermediate_dirs=True,
63
dag=dag
64
)
65
66
sensor >> download >> upload
67
```
68
69
## Architecture
70
71
The SFTP provider follows Airflow's standard provider architecture with specialized components:
72
73
- **Hooks**: Low-level interfaces for SFTP connections (synchronous and asynchronous)
74
- **Operators**: Task execution components for file transfer operations
75
- **Sensors**: Monitoring components for file presence and condition checking
76
- **Triggers**: Asynchronous components for deferrable operations
77
- **Decorators**: Simplified task creation interfaces for common patterns
78
79
Connection management is handled through Airflow's connection system with the `sftp` connection type, supporting authentication via SSH keys, passwords, and various security configurations.
80
81
## Capabilities
82
83
### SFTP Hooks
84
85
Core connectivity and file system operations including connection management, directory operations, file transfers, and path utilities. Both synchronous and asynchronous hooks are available for different operational patterns.
86
87
```python { .api }
88
class SFTPHook(SSHHook):
89
def get_conn(self) -> paramiko.SFTPClient: ...
90
def close_conn(self) -> None: ...
91
def list_directory(self, path: str) -> list[str]: ...
92
def retrieve_file(self, remote_full_path: str, local_full_path: str, prefetch: bool = True) -> None: ...
93
def store_file(self, remote_full_path: str, local_full_path: str, confirm: bool = True) -> None: ...
94
95
class SFTPHookAsync(BaseHook):
96
async def list_directory(self, path: str = "") -> list[str] | None: ...
97
async def get_mod_time(self, path: str) -> str: ...
98
```
99
100
[SFTP Hooks](./hooks.md)
101
102
### File Transfer Operations
103
104
Task execution components for uploading and downloading files between local and remote SFTP locations, with support for batch operations, intermediate directory creation, and connection pooling.
105
106
```python { .api }
107
class SFTPOperator(BaseOperator):
108
def __init__(
109
self,
110
*,
111
local_filepath: str | list[str],
112
remote_filepath: str | list[str],
113
operation: str = SFTPOperation.PUT,
114
ssh_conn_id: str | None = None,
115
create_intermediate_dirs: bool = False,
116
**kwargs,
117
) -> None: ...
118
119
class SFTPOperation:
120
PUT = "put"
121
GET = "get"
122
```
123
124
[File Transfer Operations](./operators.md)
125
126
### File Monitoring
127
128
Sensor components for monitoring file and directory presence, modification times, and pattern matching on SFTP servers, supporting both blocking and deferrable execution modes.
129
130
```python { .api }
131
class SFTPSensor(BaseSensorOperator):
132
def __init__(
133
self,
134
*,
135
path: str,
136
file_pattern: str = "",
137
newer_than: datetime | str | None = None,
138
sftp_conn_id: str = "sftp_default",
139
deferrable: bool = False,
140
**kwargs,
141
) -> None: ...
142
```
143
144
[File Monitoring](./sensors.md)
145
146
### Asynchronous Triggers
147
148
Trigger components for deferrable SFTP operations that enable efficient resource utilization by yielding control during long-running file monitoring operations.
149
150
```python { .api }
151
class SFTPTrigger(BaseTrigger):
152
def __init__(
153
self,
154
path: str,
155
file_pattern: str = "",
156
sftp_conn_id: str = "sftp_default",
157
newer_than: datetime | str | None = None,
158
poke_interval: float = 5,
159
) -> None: ...
160
161
async def run(self) -> AsyncIterator[TriggerEvent]: ...
162
```
163
164
[Asynchronous Triggers](./triggers.md)
165
166
### Task Decorators
167
168
Simplified interfaces for creating SFTP-based tasks using Python decorators, enabling more readable and maintainable DAG definitions for common SFTP operations.
169
170
```python { .api }
171
def sftp_sensor_task(python_callable: Callable | None = None, **kwargs) -> TaskDecorator: ...
172
```
173
174
[Task Decorators](./decorators.md)
175
176
## Connection Configuration
177
178
SFTP connections are configured through Airflow's connection management system:
179
180
- **Connection Type**: `sftp`
181
- **Default Connection ID**: `sftp_default`
182
- **Authentication**: SSH keys, passwords, or both
183
- **Extra Configuration**: Host key verification, known hosts, private keys
184
185
## Types
186
187
```python { .api }
188
from typing import Callable, Sequence, Any, AsyncIterator
189
from datetime import datetime
190
from paramiko import SFTPClient
191
from asyncssh.sftp import SFTPName
192
from airflow.triggers.base import TriggerEvent
193
from airflow.sensors.base import PokeReturnValue
194
from airflow.decorators.base import TaskDecorator
195
```