or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

ftp-hooks.mdftp-operators.mdftp-sensors.mdindex.md

ftp-sensors.mddocs/

0

# File Monitoring

1

2

Sensor classes for waiting and monitoring file or directory availability on FTP servers with configurable error handling, retry logic, and support for both standard FTP and secure FTPS protocols.

3

4

## Capabilities

5

6

### FTP Sensor

7

8

Primary sensor for monitoring file or directory presence on FTP servers with intelligent error handling and transient error recovery.

9

10

```python { .api }

11

class FTPSensor(BaseSensorOperator):

12

"""

13

Wait for file or directory to be present on FTP server.

14

15

Template Fields: ("path",)

16

17

Parameters:

18

- path (str): Remote file or directory path to monitor

19

- ftp_conn_id (str): FTP connection ID (default: "ftp_default")

20

- fail_on_transient_errors (bool): Fail on transient errors (default: True)

21

"""

22

23

template_fields: Sequence[str] = ("path",)

24

25

# Transient FTP error codes that can be retried

26

transient_errors = [421, 425, 426, 434, 450, 451, 452]

27

error_code_pattern = re.compile(r"\d+")

28

29

def __init__(

30

self,

31

*,

32

path: str,

33

ftp_conn_id: str = "ftp_default",

34

fail_on_transient_errors: bool = True,

35

**kwargs

36

) -> None: ...

37

38

def poke(self, context: Context) -> bool:

39

"""

40

Check if file or directory exists on FTP server.

41

42

Parameters:

43

- context (Context): Airflow task context

44

45

Returns:

46

bool: True if file/directory exists, False otherwise

47

"""

48

49

def _create_hook(self) -> FTPHook:

50

"""

51

Create and return FTPHook instance.

52

53

Returns:

54

FTPHook: Configured FTP hook

55

"""

56

57

def _get_error_code(self, e) -> int | Exception:

58

"""

59

Extract numeric error code from FTP exception.

60

61

Parameters:

62

- e (Exception): FTP exception

63

64

Returns:

65

int | Exception: Extracted error code or original exception

66

"""

67

```

68

69

### FTPS Sensor

70

71

Secure sensor for monitoring files on FTPS servers with SSL/TLS encryption support.

72

73

```python { .api }

74

class FTPSSensor(FTPSensor):

75

"""

76

Wait for file or directory to be present on FTPS server.

77

78

Inherits all FTPSensor functionality with SSL/TLS encryption support.

79

"""

80

81

def _create_hook(self) -> FTPHook:

82

"""

83

Create and return FTPSHook instance.

84

85

Returns:

86

FTPHook: Configured FTPS hook with SSL/TLS support

87

"""

88

```

89

90

## Usage Examples

91

92

### Basic File Monitoring

93

94

```python

95

from airflow import DAG

96

from airflow.providers.ftp.sensors.ftp import FTPSensor

97

from datetime import datetime, timedelta

98

99

dag = DAG('ftp_monitoring_example', start_date=datetime(2023, 1, 1))

100

101

# Wait for daily data file to appear

102

wait_for_data = FTPSensor(

103

task_id='wait_for_daily_data',

104

path='/remote/data/daily_report_{{ ds }}.csv', # Templated path

105

ftp_conn_id='my_ftp_connection',

106

poke_interval=60, # Check every minute

107

timeout=3600, # Timeout after 1 hour

108

mode='poke', # Blocking mode

109

dag=dag

110

)

111

```

112

113

### Directory Monitoring

114

115

```python

116

from airflow.providers.ftp.sensors.ftp import FTPSensor

117

118

# Wait for any file to appear in directory

119

wait_for_directory = FTPSensor(

120

task_id='wait_for_directory_content',

121

path='/remote/inbox/', # Monitor directory

122

ftp_conn_id='my_ftp_connection',

123

poke_interval=300, # Check every 5 minutes

124

timeout=7200, # Timeout after 2 hours

125

dag=dag

126

)

127

```

128

129

### Secure File Monitoring with FTPS

130

131

```python

132

from airflow.providers.ftp.sensors.ftp import FTPSSensor

133

134

# Monitor secure FTP server

135

wait_for_secure_file = FTPSSensor(

136

task_id='wait_for_secure_data',

137

path='/secure/confidential/data.xml',

138

ftp_conn_id='my_secure_ftp_connection', # FTPS connection

139

poke_interval=120, # Check every 2 minutes

140

timeout=1800, # Timeout after 30 minutes

141

dag=dag

142

)

143

```

144

145

### Advanced Error Handling Configuration

146

147

```python

148

from airflow.providers.ftp.sensors.ftp import FTPSensor

149

150

# Handle transient errors gracefully

151

resilient_sensor = FTPSensor(

152

task_id='resilient_file_monitor',

153

path='/remote/unreliable_source/data.txt',

154

ftp_conn_id='unreliable_ftp',

155

fail_on_transient_errors=False, # Don't fail on transient errors

156

poke_interval=180, # Check every 3 minutes

157

timeout=10800, # Extended timeout (3 hours)

158

retries=3, # Retry on permanent failures

159

retry_delay=timedelta(minutes=10),

160

dag=dag

161

)

162

```

163

164

### Reschedule Mode for Long-Running Monitoring

165

166

```python

167

from airflow.providers.ftp.sensors.ftp import FTPSensor

168

169

# Use reschedule mode to free up worker slots

170

long_running_sensor = FTPSensor(

171

task_id='long_running_file_monitor',

172

path='/remote/batch_data/weekly_export.zip',

173

ftp_conn_id='batch_ftp',

174

poke_interval=1800, # Check every 30 minutes

175

timeout=604800, # Timeout after 1 week

176

mode='reschedule', # Non-blocking mode

177

dag=dag

178

)

179

```

180

181

### Complete Monitoring Pipeline

182

183

```python

184

from airflow import DAG

185

from airflow.providers.ftp.sensors.ftp import FTPSensor, FTPSSensor

186

from airflow.providers.ftp.operators.ftp import FTPFileTransmitOperator, FTPOperation

187

from airflow.operators.python import PythonOperator

188

from datetime import datetime, timedelta

189

190

def validate_file():

191

"""Validate downloaded file format and content."""

192

print("Validating file format and content...")

193

# File validation logic here

194

return True

195

196

def notify_completion():

197

"""Send notification about successful processing."""

198

print("Sending completion notification...")

199

# Notification logic here

200

201

dag = DAG(

202

'comprehensive_ftp_monitoring',

203

start_date=datetime(2023, 1, 1),

204

schedule_interval=timedelta(hours=6), # Run every 6 hours

205

catchup=False

206

)

207

208

# Monitor multiple sources simultaneously

209

wait_for_source1 = FTPSensor(

210

task_id='wait_for_source1_data',

211

path='/source1/data/{{ ds }}/export.csv',

212

ftp_conn_id='source1_ftp',

213

poke_interval=300,

214

timeout=3600,

215

dag=dag

216

)

217

218

wait_for_source2 = FTPSSensor( # Secure source

219

task_id='wait_for_source2_data',

220

path='/secure/source2/{{ ds }}/sensitive_data.xml',

221

ftp_conn_id='secure_ftp',

222

poke_interval=300,

223

timeout=3600,

224

dag=dag

225

)

226

227

# Download files once available

228

download_source1 = FTPFileTransmitOperator(

229

task_id='download_source1',

230

ftp_conn_id='source1_ftp',

231

remote_filepath='/source1/data/{{ ds }}/export.csv',

232

local_filepath='/local/staging/source1_{{ ds }}.csv',

233

operation=FTPOperation.GET,

234

create_intermediate_dirs=True,

235

dag=dag

236

)

237

238

download_source2 = FTPFileTransmitOperator(

239

task_id='download_source2',

240

ftp_conn_id='secure_ftp',

241

remote_filepath='/secure/source2/{{ ds }}/sensitive_data.xml',

242

local_filepath='/local/staging/source2_{{ ds }}.xml',

243

operation=FTPOperation.GET,

244

create_intermediate_dirs=True,

245

dag=dag

246

)

247

248

# Validate downloaded files

249

validate_files = PythonOperator(

250

task_id='validate_files',

251

python_callable=validate_file,

252

dag=dag

253

)

254

255

# Send completion notification

256

notify = PythonOperator(

257

task_id='notify_completion',

258

python_callable=notify_completion,

259

dag=dag

260

)

261

262

# Define task dependencies

263

[wait_for_source1, wait_for_source2] >> [download_source1, download_source2]

264

[download_source1, download_source2] >> validate_files >> notify

265

```

266

267

## Error Handling and Recovery

268

269

### Transient Error Codes

270

271

The sensor automatically handles these transient FTP error codes by retrying rather than failing:

272

273

- **421**: Service not available, closing control connection

274

- **425**: Can't open data connection

275

- **426**: Connection closed; transfer aborted

276

- **434**: Requested host unavailable

277

- **450**: Requested file action not taken (file unavailable)

278

- **451**: Requested action aborted: local error in processing

279

- **452**: Requested action not taken (insufficient storage)

280

281

### Error Handling Strategies

282

283

1. **Permanent Errors (like 550 - File not found)**: Sensor returns False and continues poking

284

2. **Transient Errors**: Behavior depends on `fail_on_transient_errors` parameter:

285

- `True` (default): Raises exception, fails task

286

- `False`: Returns False, continues poking

287

3. **Connection Errors**: Propagated to Airflow for retry handling

288

289

### Best Practices

290

291

- Use `fail_on_transient_errors=False` for unreliable FTP servers

292

- Set appropriate `poke_interval` to balance responsiveness and server load

293

- Use `mode='reschedule'` for long-running sensors to free worker slots

294

- Configure retries and retry delays for better fault tolerance

295

- Monitor sensor logs for patterns in transient errors

296

297

## Integration with Airflow Features

298

299

### Templating Support

300

301

The `path` parameter supports Airflow templating:

302

303

```python

304

# Template examples

305

path='/data/{{ ds }}/daily_report.csv' # Execution date

306

path='/data/{{ macros.ds_add(ds, -1) }}/file.txt' # Previous day

307

path='/hourly/{{ ts_nodash_with_tz[:10] }}/data.csv' # Hour-based paths

308

```

309

310

### Sensor Modes

311

312

- **Poke Mode** (default): Blocks worker slot while waiting

313

- **Reschedule Mode**: Releases worker slot between checks, better for long waits

314

315

### Connection Management

316

317

Sensors use Airflow's connection pooling and automatic cleanup, ensuring efficient resource utilization across multiple concurrent monitoring tasks.