or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

decorators.mdhooks.mdindex.mdoperators.mdsensors.mdtriggers.md

index.mddocs/

0

# Apache Airflow SFTP Provider

1

2

A comprehensive Apache Airflow provider package for SSH File Transfer Protocol (SFTP) operations. This provider enables secure file transfers and remote file system operations within Airflow workflows, offering hooks for connectivity, operators for file transfer tasks, sensors for monitoring file presence, triggers for asynchronous operations, and decorators for simplified task creation.

3

4

## Package Information

5

6

- **Package Name**: apache-airflow-providers-sftp

7

- **Language**: Python

8

- **Installation**: `pip install apache-airflow-providers-sftp`

9

- **Dependencies**: `apache-airflow>=2.8.0`, `apache-airflow-providers-ssh>=2.1.0`, `paramiko>=2.9.0`, `asyncssh>=2.12.0`

10

11

## Core Imports

12

13

```python

14

from airflow.providers.sftp.hooks.sftp import SFTPHook, SFTPHookAsync

15

from airflow.providers.sftp.operators.sftp import SFTPOperator, SFTPOperation

16

from airflow.providers.sftp.sensors.sftp import SFTPSensor

17

from airflow.providers.sftp.triggers.sftp import SFTPTrigger

18

from airflow.providers.sftp.decorators.sensors.sftp import sftp_sensor_task

19

```

20

21

## Basic Usage

22

23

```python

24

from airflow import DAG

25

from airflow.providers.sftp.operators.sftp import SFTPOperator, SFTPOperation

26

from airflow.providers.sftp.sensors.sftp import SFTPSensor

27

from datetime import datetime, timedelta

28

29

# Create DAG instance

30

dag = DAG(

31

'sftp_example',

32

default_args={'retries': 1},

33

schedule_interval=timedelta(hours=1),

34

start_date=datetime(2023, 1, 1)

35

)

36

37

# Monitor for file existence

38

sensor = SFTPSensor(

39

task_id='check_file_exists',

40

path='/remote/path/data.csv',

41

sftp_conn_id='sftp_default',

42

dag=dag

43

)

44

45

# Download file from SFTP server

46

download = SFTPOperator(

47

task_id='download_file',

48

ssh_conn_id='sftp_default',

49

local_filepath='/local/path/data.csv',

50

remote_filepath='/remote/path/data.csv',

51

operation=SFTPOperation.GET,

52

dag=dag

53

)

54

55

# Upload file to SFTP server

56

upload = SFTPOperator(

57

task_id='upload_file',

58

ssh_conn_id='sftp_default',

59

local_filepath='/local/path/processed_data.csv',

60

remote_filepath='/remote/path/processed_data.csv',

61

operation=SFTPOperation.PUT,

62

create_intermediate_dirs=True,

63

dag=dag

64

)

65

66

sensor >> download >> upload

67

```

68

69

## Architecture

70

71

The SFTP provider follows Airflow's standard provider architecture with specialized components:

72

73

- **Hooks**: Low-level interfaces for SFTP connections (synchronous and asynchronous)

74

- **Operators**: Task execution components for file transfer operations

75

- **Sensors**: Monitoring components for file presence and condition checking

76

- **Triggers**: Asynchronous components for deferrable operations

77

- **Decorators**: Simplified task creation interfaces for common patterns

78

79

Connection management is handled through Airflow's connection system with the `sftp` connection type, supporting authentication via SSH keys, passwords, and various security configurations.

80

81

## Capabilities

82

83

### SFTP Hooks

84

85

Core connectivity and file system operations including connection management, directory operations, file transfers, and path utilities. Both synchronous and asynchronous hooks are available for different operational patterns.

86

87

```python { .api }

88

class SFTPHook(SSHHook):

89

def get_conn(self) -> paramiko.SFTPClient: ...

90

def close_conn(self) -> None: ...

91

def list_directory(self, path: str) -> list[str]: ...

92

def retrieve_file(self, remote_full_path: str, local_full_path: str, prefetch: bool = True) -> None: ...

93

def store_file(self, remote_full_path: str, local_full_path: str, confirm: bool = True) -> None: ...

94

95

class SFTPHookAsync(BaseHook):

96

async def list_directory(self, path: str = "") -> list[str] | None: ...

97

async def get_mod_time(self, path: str) -> str: ...

98

```

99

100

[SFTP Hooks](./hooks.md)

101

102

### File Transfer Operations

103

104

Task execution components for uploading and downloading files between local and remote SFTP locations, with support for batch operations, intermediate directory creation, and connection pooling.

105

106

```python { .api }

107

class SFTPOperator(BaseOperator):

108

def __init__(

109

self,

110

*,

111

local_filepath: str | list[str],

112

remote_filepath: str | list[str],

113

operation: str = SFTPOperation.PUT,

114

ssh_conn_id: str | None = None,

115

create_intermediate_dirs: bool = False,

116

**kwargs,

117

) -> None: ...

118

119

class SFTPOperation:

120

PUT = "put"

121

GET = "get"

122

```

123

124

[File Transfer Operations](./operators.md)

125

126

### File Monitoring

127

128

Sensor components for monitoring file and directory presence, modification times, and pattern matching on SFTP servers, supporting both blocking and deferrable execution modes.

129

130

```python { .api }

131

class SFTPSensor(BaseSensorOperator):

132

def __init__(

133

self,

134

*,

135

path: str,

136

file_pattern: str = "",

137

newer_than: datetime | str | None = None,

138

sftp_conn_id: str = "sftp_default",

139

deferrable: bool = False,

140

**kwargs,

141

) -> None: ...

142

```

143

144

[File Monitoring](./sensors.md)

145

146

### Asynchronous Triggers

147

148

Trigger components for deferrable SFTP operations that enable efficient resource utilization by yielding control during long-running file monitoring operations.

149

150

```python { .api }

151

class SFTPTrigger(BaseTrigger):

152

def __init__(

153

self,

154

path: str,

155

file_pattern: str = "",

156

sftp_conn_id: str = "sftp_default",

157

newer_than: datetime | str | None = None,

158

poke_interval: float = 5,

159

) -> None: ...

160

161

async def run(self) -> AsyncIterator[TriggerEvent]: ...

162

```

163

164

[Asynchronous Triggers](./triggers.md)

165

166

### Task Decorators

167

168

Simplified interfaces for creating SFTP-based tasks using Python decorators, enabling more readable and maintainable DAG definitions for common SFTP operations.

169

170

```python { .api }

171

def sftp_sensor_task(python_callable: Callable | None = None, **kwargs) -> TaskDecorator: ...

172

```

173

174

[Task Decorators](./decorators.md)

175

176

## Connection Configuration

177

178

SFTP connections are configured through Airflow's connection management system:

179

180

- **Connection Type**: `sftp`

181

- **Default Connection ID**: `sftp_default`

182

- **Authentication**: SSH keys, passwords, or both

183

- **Extra Configuration**: Host key verification, known hosts, private keys

184

185

## Types

186

187

```python { .api }

188

from typing import Callable, Sequence, Any, AsyncIterator

189

from datetime import datetime

190

from paramiko import SFTPClient

191

from asyncssh.sftp import SFTPName

192

from airflow.triggers.base import TriggerEvent

193

from airflow.sensors.base import PokeReturnValue

194

from airflow.decorators.base import TaskDecorator

195

```