or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

ftp-hooks.mdftp-operators.mdftp-sensors.mdindex.md

ftp-operators.mddocs/

0

# File Transfer Operations

1

2

Operator classes for performing file uploads, downloads, and transfers between local and remote FTP servers with Airflow integration, templating support, and OpenLineage compatibility for data lineage tracking.

3

4

## Capabilities

5

6

### FTP Operation Constants

7

8

Constants defining supported file transfer operations.

9

10

```python { .api }

11

class FTPOperation:

12

"""Operation types for FTP file transfers."""

13

14

PUT = "put" # Upload files to remote server

15

GET = "get" # Download files from remote server

16

```

17

18

### FTP File Transfer Operator

19

20

Primary operator for transferring files between local filesystem and FTP servers with support for single files or batch operations.

21

22

```python { .api }

23

class FTPFileTransmitOperator(BaseOperator):

24

"""

25

Transfer files between local and remote FTP locations.

26

27

Template Fields: ("local_filepath", "remote_filepath")

28

29

Parameters:

30

- ftp_conn_id (str): FTP connection ID (default: "ftp_default")

31

- local_filepath (str | list[str]): Local file path(s)

32

- remote_filepath (str | list[str]): Remote file path(s)

33

- operation (str): Transfer direction - FTPOperation.PUT or FTPOperation.GET

34

- create_intermediate_dirs (bool): Create missing directories (default: False)

35

"""

36

37

template_fields: Sequence[str] = ("local_filepath", "remote_filepath")

38

39

def __init__(

40

self,

41

*,

42

ftp_conn_id: str = "ftp_default",

43

local_filepath: str | list[str],

44

remote_filepath: str | list[str],

45

operation: str = FTPOperation.PUT,

46

create_intermediate_dirs: bool = False,

47

**kwargs

48

) -> None: ...

49

50

def execute(self, context: Any) -> str | list[str] | None:

51

"""

52

Execute file transfer operation.

53

54

Parameters:

55

- context (Any): Airflow task context

56

57

Returns:

58

str | list[str] | None: Local filepath(s) after operation

59

"""

60

61

def get_openlineage_facets_on_start(self):

62

"""

63

Return OpenLineage datasets for data lineage tracking.

64

65

Returns:

66

OperatorLineage: Input and output datasets for lineage

67

"""

68

69

@cached_property

70

def hook(self) -> FTPHook:

71

"""

72

Create and return FTPHook instance.

73

74

Returns:

75

FTPHook: Configured FTP hook

76

"""

77

```

78

79

### FTPS File Transfer Operator

80

81

Secure file transfer operator using FTPS (FTP over SSL/TLS) for encrypted file transfers.

82

83

```python { .api }

84

class FTPSFileTransmitOperator(FTPFileTransmitOperator):

85

"""

86

Transfer files using FTPS (FTP over SSL/TLS) for encrypted transfers.

87

88

Inherits all FTPFileTransmitOperator functionality with SSL/TLS encryption.

89

"""

90

91

@cached_property

92

def hook(self) -> FTPSHook:

93

"""

94

Create and return FTPSHook instance.

95

96

Returns:

97

FTPSHook: Configured FTPS hook with SSL/TLS support

98

"""

99

```

100

101

## Usage Examples

102

103

### Single File Upload

104

105

```python

106

from airflow import DAG

107

from airflow.providers.ftp.operators.ftp import FTPFileTransmitOperator, FTPOperation

108

from datetime import datetime

109

110

dag = DAG('ftp_upload_example', start_date=datetime(2023, 1, 1))

111

112

upload_task = FTPFileTransmitOperator(

113

task_id='upload_data_file',

114

ftp_conn_id='my_ftp_connection',

115

local_filepath='/local/data/output.csv',

116

remote_filepath='/remote/uploads/output.csv',

117

operation=FTPOperation.PUT,

118

create_intermediate_dirs=True, # Create /remote/uploads/ if it doesn't exist

119

dag=dag

120

)

121

```

122

123

### Single File Download

124

125

```python

126

from airflow.providers.ftp.operators.ftp import FTPFileTransmitOperator, FTPOperation

127

128

download_task = FTPFileTransmitOperator(

129

task_id='download_data_file',

130

ftp_conn_id='my_ftp_connection',

131

local_filepath='/local/data/input.csv',

132

remote_filepath='/remote/data/input.csv',

133

operation=FTPOperation.GET,

134

create_intermediate_dirs=True, # Create /local/data/ if it doesn't exist

135

dag=dag

136

)

137

```

138

139

### Batch File Operations

140

141

```python

142

from airflow.providers.ftp.operators.ftp import FTPFileTransmitOperator, FTPOperation

143

144

# Upload multiple files in one operation

145

batch_upload = FTPFileTransmitOperator(

146

task_id='batch_upload_files',

147

ftp_conn_id='my_ftp_connection',

148

local_filepath=[

149

'/local/data/file1.csv',

150

'/local/data/file2.csv',

151

'/local/data/file3.csv'

152

],

153

remote_filepath=[

154

'/remote/uploads/file1.csv',

155

'/remote/uploads/file2.csv',

156

'/remote/uploads/file3.csv'

157

],

158

operation=FTPOperation.PUT,

159

create_intermediate_dirs=True,

160

dag=dag

161

)

162

163

# Download multiple files in one operation

164

batch_download = FTPFileTransmitOperator(

165

task_id='batch_download_files',

166

ftp_conn_id='my_ftp_connection',

167

local_filepath=[

168

'/local/downloads/report1.pdf',

169

'/local/downloads/report2.pdf'

170

],

171

remote_filepath=[

172

'/remote/reports/report1.pdf',

173

'/remote/reports/report2.pdf'

174

],

175

operation=FTPOperation.GET,

176

create_intermediate_dirs=True,

177

dag=dag

178

)

179

```

180

181

### Secure File Transfer with FTPS

182

183

```python

184

from airflow.providers.ftp.operators.ftp import FTPSFileTransmitOperator, FTPOperation

185

186

secure_upload = FTPSFileTransmitOperator(

187

task_id='secure_upload',

188

ftp_conn_id='my_secure_ftp_connection', # Connection configured for FTPS

189

local_filepath='/local/sensitive/data.xml',

190

remote_filepath='/remote/secure/data.xml',

191

operation=FTPOperation.PUT,

192

create_intermediate_dirs=True,

193

dag=dag

194

)

195

```

196

197

### Template Usage with Airflow Variables

198

199

```python

200

from airflow.providers.ftp.operators.ftp import FTPFileTransmitOperator, FTPOperation

201

202

# Using templated file paths with Airflow variables and macros

203

templated_transfer = FTPFileTransmitOperator(

204

task_id='templated_transfer',

205

ftp_conn_id='my_ftp_connection',

206

local_filepath='/local/data/{{ ds }}/report.csv', # Uses execution date

207

remote_filepath='/remote/reports/{{ ds }}/report.csv',

208

operation=FTPOperation.PUT,

209

create_intermediate_dirs=True,

210

dag=dag

211

)

212

```

213

214

### Complete ETL Pipeline Example

215

216

```python

217

from airflow import DAG

218

from airflow.providers.ftp.operators.ftp import FTPFileTransmitOperator, FTPOperation

219

from airflow.providers.ftp.sensors.ftp import FTPSensor

220

from airflow.operators.python import PythonOperator

221

from datetime import datetime, timedelta

222

223

def process_data():

224

# Data processing logic here

225

print("Processing downloaded data...")

226

return "Data processed successfully"

227

228

dag = DAG(

229

'ftp_etl_pipeline',

230

start_date=datetime(2023, 1, 1),

231

schedule_interval=timedelta(days=1),

232

catchup=False

233

)

234

235

# Wait for input file to arrive

236

wait_for_input = FTPSensor(

237

task_id='wait_for_input_file',

238

path='/remote/input/{{ ds }}/data.csv',

239

ftp_conn_id='source_ftp',

240

poke_interval=300, # Check every 5 minutes

241

timeout=3600, # Timeout after 1 hour

242

dag=dag

243

)

244

245

# Download input file

246

download_input = FTPFileTransmitOperator(

247

task_id='download_input_file',

248

ftp_conn_id='source_ftp',

249

local_filepath='/local/staging/{{ ds }}/input.csv',

250

remote_filepath='/remote/input/{{ ds }}/data.csv',

251

operation=FTPOperation.GET,

252

create_intermediate_dirs=True,

253

dag=dag

254

)

255

256

# Process the data

257

process_task = PythonOperator(

258

task_id='process_data',

259

python_callable=process_data,

260

dag=dag

261

)

262

263

# Upload processed results

264

upload_results = FTPFileTransmitOperator(

265

task_id='upload_results',

266

ftp_conn_id='destination_ftp',

267

local_filepath='/local/output/{{ ds }}/processed_data.csv',

268

remote_filepath='/remote/output/{{ ds }}/processed_data.csv',

269

operation=FTPOperation.PUT,

270

create_intermediate_dirs=True,

271

dag=dag

272

)

273

274

# Define task dependencies

275

wait_for_input >> download_input >> process_task >> upload_results

276

```

277

278

## Error Handling

279

280

The operators handle various error conditions:

281

282

- **ValueError**: Raised when local and remote filepath arrays have different lengths

283

- **TypeError**: Raised for unsupported operation values (not GET or PUT)

284

- **FTP Errors**: Propagated from underlying FTPHook operations

285

- **File System Errors**: Raised when local directories cannot be created or accessed

286

287

## OpenLineage Integration

288

289

When OpenLineage providers are available, the operators automatically generate data lineage information:

290

291

- **Input Datasets**: Source file locations (local for PUT, remote for GET)

292

- **Output Datasets**: Destination file locations (remote for PUT, local for GET)

293

- **Namespace Format**: `file://hostname:port` for proper dataset identification

294

295

This enables comprehensive data lineage tracking across FTP file transfer operations within your data pipelines.