or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

decorators.mdhooks.mdindex.mdoperators.mdsensors.mdtriggers.md

hooks.mddocs/

0

# SFTP Hooks

1

2

Core connectivity and file system operations for SFTP servers. The SFTP provider includes both synchronous and asynchronous hooks to support different operational patterns within Airflow workflows.

3

4

## Capabilities

5

6

### Synchronous SFTP Hook

7

8

Main hook for SFTP operations using paramiko, providing comprehensive file system operations and connection management.

9

10

```python { .api }

11

class SFTPHook(SSHHook):

12

"""

13

Interact with SFTP servers using paramiko.

14

15

Inherits from SSHHook and provides SFTP-specific functionality.

16

Supports connection pooling, authentication via SSH keys or passwords,

17

and comprehensive file system operations.

18

"""

19

20

conn_name_attr = "ssh_conn_id"

21

default_conn_name = "sftp_default"

22

conn_type = "sftp"

23

hook_name = "SFTP"

24

25

def __init__(

26

self,

27

ssh_conn_id: str | None = "sftp_default",

28

ssh_hook: SSHHook | None = None,

29

*args,

30

**kwargs,

31

) -> None:

32

"""

33

Initialize SFTP hook.

34

35

Parameters:

36

- ssh_conn_id: Connection ID from Airflow connections

37

- ssh_hook: Optional existing SSH hook (deprecated)

38

"""

39

40

def get_conn(self) -> paramiko.SFTPClient:

41

"""Open an SFTP connection to the remote host."""

42

43

def close_conn(self) -> None:

44

"""Close the SFTP connection."""

45

46

def test_connection(self) -> tuple[bool, str]:

47

"""Test the SFTP connection by calling path with directory."""

48

```

49

50

### UI Configuration

51

52

```python { .api }

53

@classmethod

54

def get_ui_field_behaviour(cls) -> dict[str, Any]:

55

"""

56

Get UI field behavior configuration for Airflow connections.

57

58

Provides configuration for hiding/relabeling connection form fields

59

in the Airflow web UI when creating SFTP connections.

60

61

Returns:

62

Dictionary containing UI field configuration with hidden fields

63

and field relabeling specifications

64

"""

65

```

66

67

### Connection Management

68

69

```python { .api }

70

def get_conn(self) -> paramiko.SFTPClient:

71

"""

72

Open an SFTP connection to the remote host.

73

74

Returns:

75

paramiko.SFTPClient instance for SFTP operations

76

"""

77

78

def close_conn(self) -> None:

79

"""Close the SFTP connection and cleanup resources."""

80

81

def test_connection(self) -> tuple[bool, str]:

82

"""

83

Test the SFTP connection.

84

85

Returns:

86

Tuple of (success: bool, message: str)

87

"""

88

```

89

90

### Directory Operations

91

92

```python { .api }

93

def describe_directory(self, path: str) -> dict[str, dict[str, str | int | None]]:

94

"""

95

Get file information in a directory on the remote system.

96

97

Parameters:

98

- path: Full path to the remote directory

99

100

Returns:

101

Dictionary mapping filenames to their attributes (size, type, modify)

102

"""

103

104

def list_directory(self, path: str) -> list[str]:

105

"""

106

List files in a directory on the remote system.

107

108

Parameters:

109

- path: Full path to the remote directory to list

110

111

Returns:

112

Sorted list of filenames

113

"""

114

115

def mkdir(self, path: str, mode: int = 0o777) -> None:

116

"""

117

Create a directory on the remote system.

118

119

Parameters:

120

- path: Full path to the remote directory to create

121

- mode: Int permissions of octal mode for directory (default: 0o777)

122

"""

123

124

def create_directory(self, path: str, mode: int = 0o777) -> None:

125

"""

126

Create a directory on the remote system with parent directories.

127

128

Creates parent directories if needed and returns silently if target exists.

129

130

Parameters:

131

- path: Full path to the remote directory to create

132

- mode: Int permissions of octal mode for directory (default: 0o777)

133

"""

134

135

def delete_directory(self, path: str) -> None:

136

"""

137

Delete a directory on the remote system.

138

139

Parameters:

140

- path: Full path to the remote directory to delete

141

"""

142

```

143

144

### File Operations

145

146

```python { .api }

147

def retrieve_file(self, remote_full_path: str, local_full_path: str, prefetch: bool = True) -> None:

148

"""

149

Transfer the remote file to a local location.

150

151

Parameters:

152

- remote_full_path: Full path to the remote file

153

- local_full_path: Full path to the local file

154

- prefetch: Controls whether prefetch is performed (default: True)

155

"""

156

157

def store_file(self, remote_full_path: str, local_full_path: str, confirm: bool = True) -> None:

158

"""

159

Transfer a local file to the remote location.

160

161

Parameters:

162

- remote_full_path: Full path to the remote file

163

- local_full_path: Full path to the local file

164

- confirm: Whether to confirm the transfer (default: True)

165

"""

166

167

def delete_file(self, path: str) -> None:

168

"""

169

Remove a file on the server.

170

171

Parameters:

172

- path: Full path to the remote file

173

"""

174

175

def get_mod_time(self, path: str) -> str:

176

"""

177

Get an entry's modification time.

178

179

Parameters:

180

- path: Full path to the remote file

181

182

Returns:

183

Modification time as string in format YYYYMMDDHHMMSS

184

"""

185

```

186

187

### Path Utilities

188

189

```python { .api }

190

def isdir(self, path: str) -> bool:

191

"""

192

Check if the path provided is a directory.

193

194

Parameters:

195

- path: Full path to the remote directory to check

196

197

Returns:

198

True if path is a directory, False otherwise

199

"""

200

201

def isfile(self, path: str) -> bool:

202

"""

203

Check if the path provided is a file.

204

205

Parameters:

206

- path: Full path to the remote file to check

207

208

Returns:

209

True if path is a file, False otherwise

210

"""

211

212

def path_exists(self, path: str) -> bool:

213

"""

214

Whether a remote entity exists.

215

216

Parameters:

217

- path: Full path to the remote file or directory

218

219

Returns:

220

True if path exists, False otherwise

221

"""

222

```

223

224

### Pattern Matching

225

226

```python { .api }

227

def get_file_by_pattern(self, path, fnmatch_pattern) -> str:

228

"""

229

Get the first matching file based on the given fnmatch type pattern.

230

231

Parameters:

232

- path: Path to be checked

233

- fnmatch_pattern: The pattern that will be matched with fnmatch

234

235

Returns:

236

String containing the first found file, or empty string if none matched

237

"""

238

239

def get_files_by_pattern(self, path, fnmatch_pattern) -> list[str]:

240

"""

241

Get all matching files based on the given fnmatch type pattern.

242

243

Parameters:

244

- path: Path to be checked

245

- fnmatch_pattern: The pattern that will be matched with fnmatch

246

247

Returns:

248

List of strings containing the found files, or empty list if none matched

249

"""

250

251

@staticmethod

252

def _is_path_match(path: str, prefix: str | None = None, delimiter: str | None = None) -> bool:

253

"""

254

Whether given path starts with prefix (if set) and ends with delimiter (if set).

255

256

Internal utility method used by get_tree_map for path filtering.

257

258

Parameters:

259

- path: Path to be checked

260

- prefix: If set, path will be checked if starting with prefix

261

- delimiter: If set, path will be checked if ending with delimiter

262

263

Returns:

264

True if path matches criteria, False otherwise

265

"""

266

```

267

268

### Tree Walking

269

270

```python { .api }

271

def walktree(

272

self,

273

path: str,

274

fcallback: Callable[[str], Any | None],

275

dcallback: Callable[[str], Any | None],

276

ucallback: Callable[[str], Any | None],

277

recurse: bool = True,

278

) -> None:

279

"""

280

Recursively descend, depth first, the directory tree at path.

281

282

Calls discrete callback functions for each regular file, directory,

283

and unknown file type.

284

285

Parameters:

286

- path: Root of remote directory to descend, use '.' to start at pwd

287

- fcallback: Callback function to invoke for a regular file

288

- dcallback: Callback function to invoke for a directory

289

- ucallback: Callback function to invoke for an unknown file type

290

- recurse: Should it recurse (default: True)

291

"""

292

293

def get_tree_map(

294

self, path: str, prefix: str | None = None, delimiter: str | None = None

295

) -> tuple[list[str], list[str], list[str]]:

296

"""

297

Get tuple with recursive lists of files, directories and unknown paths.

298

299

Can filter results by giving prefix and/or delimiter parameters.

300

301

Parameters:

302

- path: Path from which tree will be built

303

- prefix: If set, paths will be added if they start with prefix

304

- delimiter: If set, paths will be added if they end with delimiter

305

306

Returns:

307

Tuple with list of files, dirs and unknown items

308

"""

309

```

310

311

### Asynchronous SFTP Hook

312

313

Async hook for SFTP operations using asyncssh, designed for high-performance asynchronous operations.

314

315

```python { .api }

316

class SFTPHookAsync(BaseHook):

317

"""

318

Interact with an SFTP server via asyncssh package.

319

320

Provides asynchronous SFTP operations for improved performance

321

in concurrent scenarios and deferrable operations.

322

"""

323

324

conn_name_attr = "ssh_conn_id"

325

default_conn_name = "sftp_default"

326

conn_type = "sftp"

327

hook_name = "SFTP"

328

default_known_hosts = "~/.ssh/known_hosts"

329

330

def __init__(

331

self,

332

sftp_conn_id: str = default_conn_name,

333

host: str = "",

334

port: int = 22,

335

username: str = "",

336

password: str = "",

337

known_hosts: str = default_known_hosts,

338

key_file: str = "",

339

passphrase: str = "",

340

private_key: str = "",

341

) -> None:

342

"""

343

Initialize async SFTP hook.

344

345

Parameters:

346

- sftp_conn_id: SFTP connection ID to be used for connecting to SFTP server

347

- host: Hostname of the SFTP server

348

- port: Port of the SFTP server (default: 22)

349

- username: Username used when authenticating to the SFTP server

350

- password: Password used when authenticating to the SFTP server

351

- known_hosts: Path to the known_hosts file (default: ~/.ssh/known_hosts)

352

- key_file: Path to the client key file used for authentication

353

- passphrase: Passphrase used with the key_file for authentication

354

- private_key: Private key content as string

355

"""

356

```

357

358

### Asynchronous Operations

359

360

```python { .api }

361

async def list_directory(self, path: str = "") -> list[str] | None:

362

"""

363

Return a list of files on the SFTP server at the provided path.

364

365

Parameters:

366

- path: Path to list (default: current directory)

367

368

Returns:

369

Sorted list of filenames, or None if path doesn't exist

370

"""

371

372

async def read_directory(self, path: str = "") -> Sequence[asyncssh.sftp.SFTPName] | None:

373

"""

374

Return a list of files along with their attributes on the SFTP server.

375

376

Parameters:

377

- path: Path to list (default: current directory)

378

379

Returns:

380

Sequence of SFTPName objects with file attributes, or None if path doesn't exist

381

"""

382

383

async def get_files_and_attrs_by_pattern(

384

self, path: str = "", fnmatch_pattern: str = ""

385

) -> Sequence[asyncssh.sftp.SFTPName]:

386

"""

387

Get the files along with their attributes matching the pattern.

388

389

Parameters:

390

- path: Path to search in

391

- fnmatch_pattern: Pattern to match (e.g., "*.pdf")

392

393

Returns:

394

Sequence of SFTPName objects for files matching the pattern

395

396

Raises:

397

FileNotFoundError: If no files at path found

398

"""

399

400

async def get_mod_time(self, path: str) -> str:

401

"""

402

Get last modified time for the file path.

403

404

Parameters:

405

- path: Full path to the remote file

406

407

Returns:

408

Last modification time as string in format YYYYMMDDHHMMSS

409

410

Raises:

411

AirflowException: If no files matching

412

"""

413

```

414

415

## Usage Examples

416

417

### Basic File Operations

418

419

```python

420

from airflow.providers.sftp.hooks.sftp import SFTPHook

421

422

# Initialize hook

423

hook = SFTPHook(ssh_conn_id='my_sftp_conn')

424

425

# Check if file exists

426

if hook.path_exists('/remote/path/file.txt'):

427

# Download file

428

hook.retrieve_file('/remote/path/file.txt', '/local/path/file.txt')

429

430

# Get file modification time

431

mod_time = hook.get_mod_time('/remote/path/file.txt')

432

print(f"File last modified: {mod_time}")

433

434

# Upload file

435

hook.store_file('/remote/path/new_file.txt', '/local/path/data.txt')

436

437

# Clean up connection

438

hook.close_conn()

439

```

440

441

### Directory Operations

442

443

```python

444

from airflow.providers.sftp.hooks.sftp import SFTPHook

445

446

hook = SFTPHook(ssh_conn_id='my_sftp_conn')

447

448

# List directory contents

449

files = hook.list_directory('/remote/path')

450

print(f"Found {len(files)} files")

451

452

# Get detailed file information

453

file_info = hook.describe_directory('/remote/path')

454

for filename, attrs in file_info.items():

455

print(f"{filename}: {attrs['size']} bytes, {attrs['type']}")

456

457

# Create directory with parents

458

hook.create_directory('/remote/path/new/nested/dir')

459

460

# Find files by pattern

461

csv_files = hook.get_files_by_pattern('/remote/path', '*.csv')

462

print(f"Found CSV files: {csv_files}")

463

```

464

465

### Asynchronous Operations

466

467

```python

468

from airflow.providers.sftp.hooks.sftp import SFTPHookAsync

469

470

async def async_file_operations():

471

hook = SFTPHookAsync(sftp_conn_id='my_sftp_conn')

472

473

# List files asynchronously

474

files = await hook.list_directory('/remote/path')

475

if files:

476

print(f"Found {len(files)} files")

477

478

# Get files matching pattern with attributes

479

pdf_files = await hook.get_files_and_attrs_by_pattern(

480

'/remote/path', '*.pdf'

481

)

482

483

for file in pdf_files:

484

mod_time = await hook.get_mod_time(f'/remote/path/{file.filename}')

485

print(f"{file.filename}: modified {mod_time}")

486

```