SSH File Transfer Protocol (SFTP) provider for Apache Airflow with hooks, operators, sensors, triggers, and decorators for secure file operations
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Core connectivity and file system operations for SFTP servers. The SFTP provider includes both synchronous and asynchronous hooks to support different operational patterns within Airflow workflows.
Main hook for SFTP operations using paramiko, providing comprehensive file system operations and connection management.
class SFTPHook(SSHHook):
"""
Interact with SFTP servers using paramiko.
Inherits from SSHHook and provides SFTP-specific functionality.
Supports connection pooling, authentication via SSH keys or passwords,
and comprehensive file system operations.
"""
conn_name_attr = "ssh_conn_id"
default_conn_name = "sftp_default"
conn_type = "sftp"
hook_name = "SFTP"
def __init__(
self,
ssh_conn_id: str | None = "sftp_default",
ssh_hook: SSHHook | None = None,
*args,
**kwargs,
) -> None:
"""
Initialize SFTP hook.
Parameters:
- ssh_conn_id: Connection ID from Airflow connections
- ssh_hook: Optional existing SSH hook (deprecated)
"""
def get_conn(self) -> paramiko.SFTPClient:
"""Open an SFTP connection to the remote host."""
def close_conn(self) -> None:
"""Close the SFTP connection."""
def test_connection(self) -> tuple[bool, str]:
"""Test the SFTP connection by calling path with directory."""@classmethod
def get_ui_field_behaviour(cls) -> dict[str, Any]:
"""
Get UI field behavior configuration for Airflow connections.
Provides configuration for hiding/relabeling connection form fields
in the Airflow web UI when creating SFTP connections.
Returns:
Dictionary containing UI field configuration with hidden fields
and field relabeling specifications
"""def get_conn(self) -> paramiko.SFTPClient:
"""
Open an SFTP connection to the remote host.
Returns:
paramiko.SFTPClient instance for SFTP operations
"""
def close_conn(self) -> None:
"""Close the SFTP connection and cleanup resources."""
def test_connection(self) -> tuple[bool, str]:
"""
Test the SFTP connection.
Returns:
Tuple of (success: bool, message: str)
"""def describe_directory(self, path: str) -> dict[str, dict[str, str | int | None]]:
"""
Get file information in a directory on the remote system.
Parameters:
- path: Full path to the remote directory
Returns:
Dictionary mapping filenames to their attributes (size, type, modify)
"""
def list_directory(self, path: str) -> list[str]:
"""
List files in a directory on the remote system.
Parameters:
- path: Full path to the remote directory to list
Returns:
Sorted list of filenames
"""
def mkdir(self, path: str, mode: int = 0o777) -> None:
"""
Create a directory on the remote system.
Parameters:
- path: Full path to the remote directory to create
- mode: Int permissions of octal mode for directory (default: 0o777)
"""
def create_directory(self, path: str, mode: int = 0o777) -> None:
"""
Create a directory on the remote system with parent directories.
Creates parent directories if needed and returns silently if target exists.
Parameters:
- path: Full path to the remote directory to create
- mode: Int permissions of octal mode for directory (default: 0o777)
"""
def delete_directory(self, path: str) -> None:
"""
Delete a directory on the remote system.
Parameters:
- path: Full path to the remote directory to delete
"""def retrieve_file(self, remote_full_path: str, local_full_path: str, prefetch: bool = True) -> None:
"""
Transfer the remote file to a local location.
Parameters:
- remote_full_path: Full path to the remote file
- local_full_path: Full path to the local file
- prefetch: Controls whether prefetch is performed (default: True)
"""
def store_file(self, remote_full_path: str, local_full_path: str, confirm: bool = True) -> None:
"""
Transfer a local file to the remote location.
Parameters:
- remote_full_path: Full path to the remote file
- local_full_path: Full path to the local file
- confirm: Whether to confirm the transfer (default: True)
"""
def delete_file(self, path: str) -> None:
"""
Remove a file on the server.
Parameters:
- path: Full path to the remote file
"""
def get_mod_time(self, path: str) -> str:
"""
Get an entry's modification time.
Parameters:
- path: Full path to the remote file
Returns:
Modification time as string in format YYYYMMDDHHMMSS
"""def isdir(self, path: str) -> bool:
"""
Check if the path provided is a directory.
Parameters:
- path: Full path to the remote directory to check
Returns:
True if path is a directory, False otherwise
"""
def isfile(self, path: str) -> bool:
"""
Check if the path provided is a file.
Parameters:
- path: Full path to the remote file to check
Returns:
True if path is a file, False otherwise
"""
def path_exists(self, path: str) -> bool:
"""
Whether a remote entity exists.
Parameters:
- path: Full path to the remote file or directory
Returns:
True if path exists, False otherwise
"""def get_file_by_pattern(self, path, fnmatch_pattern) -> str:
"""
Get the first matching file based on the given fnmatch type pattern.
Parameters:
- path: Path to be checked
- fnmatch_pattern: The pattern that will be matched with fnmatch
Returns:
String containing the first found file, or empty string if none matched
"""
def get_files_by_pattern(self, path, fnmatch_pattern) -> list[str]:
"""
Get all matching files based on the given fnmatch type pattern.
Parameters:
- path: Path to be checked
- fnmatch_pattern: The pattern that will be matched with fnmatch
Returns:
List of strings containing the found files, or empty list if none matched
"""
@staticmethod
def _is_path_match(path: str, prefix: str | None = None, delimiter: str | None = None) -> bool:
"""
Whether given path starts with prefix (if set) and ends with delimiter (if set).
Internal utility method used by get_tree_map for path filtering.
Parameters:
- path: Path to be checked
- prefix: If set, path will be checked if starting with prefix
- delimiter: If set, path will be checked if ending with delimiter
Returns:
True if path matches criteria, False otherwise
"""def walktree(
self,
path: str,
fcallback: Callable[[str], Any | None],
dcallback: Callable[[str], Any | None],
ucallback: Callable[[str], Any | None],
recurse: bool = True,
) -> None:
"""
Recursively descend, depth first, the directory tree at path.
Calls discrete callback functions for each regular file, directory,
and unknown file type.
Parameters:
- path: Root of remote directory to descend, use '.' to start at pwd
- fcallback: Callback function to invoke for a regular file
- dcallback: Callback function to invoke for a directory
- ucallback: Callback function to invoke for an unknown file type
- recurse: Should it recurse (default: True)
"""
def get_tree_map(
self, path: str, prefix: str | None = None, delimiter: str | None = None
) -> tuple[list[str], list[str], list[str]]:
"""
Get tuple with recursive lists of files, directories and unknown paths.
Can filter results by giving prefix and/or delimiter parameters.
Parameters:
- path: Path from which tree will be built
- prefix: If set, paths will be added if they start with prefix
- delimiter: If set, paths will be added if they end with delimiter
Returns:
Tuple with list of files, dirs and unknown items
"""Async hook for SFTP operations using asyncssh, designed for high-performance asynchronous operations.
class SFTPHookAsync(BaseHook):
"""
Interact with an SFTP server via asyncssh package.
Provides asynchronous SFTP operations for improved performance
in concurrent scenarios and deferrable operations.
"""
conn_name_attr = "ssh_conn_id"
default_conn_name = "sftp_default"
conn_type = "sftp"
hook_name = "SFTP"
default_known_hosts = "~/.ssh/known_hosts"
def __init__(
self,
sftp_conn_id: str = default_conn_name,
host: str = "",
port: int = 22,
username: str = "",
password: str = "",
known_hosts: str = default_known_hosts,
key_file: str = "",
passphrase: str = "",
private_key: str = "",
) -> None:
"""
Initialize async SFTP hook.
Parameters:
- sftp_conn_id: SFTP connection ID to be used for connecting to SFTP server
- host: Hostname of the SFTP server
- port: Port of the SFTP server (default: 22)
- username: Username used when authenticating to the SFTP server
- password: Password used when authenticating to the SFTP server
- known_hosts: Path to the known_hosts file (default: ~/.ssh/known_hosts)
- key_file: Path to the client key file used for authentication
- passphrase: Passphrase used with the key_file for authentication
- private_key: Private key content as string
"""async def list_directory(self, path: str = "") -> list[str] | None:
"""
Return a list of files on the SFTP server at the provided path.
Parameters:
- path: Path to list (default: current directory)
Returns:
Sorted list of filenames, or None if path doesn't exist
"""
async def read_directory(self, path: str = "") -> Sequence[asyncssh.sftp.SFTPName] | None:
"""
Return a list of files along with their attributes on the SFTP server.
Parameters:
- path: Path to list (default: current directory)
Returns:
Sequence of SFTPName objects with file attributes, or None if path doesn't exist
"""
async def get_files_and_attrs_by_pattern(
self, path: str = "", fnmatch_pattern: str = ""
) -> Sequence[asyncssh.sftp.SFTPName]:
"""
Get the files along with their attributes matching the pattern.
Parameters:
- path: Path to search in
- fnmatch_pattern: Pattern to match (e.g., "*.pdf")
Returns:
Sequence of SFTPName objects for files matching the pattern
Raises:
FileNotFoundError: If no files at path found
"""
async def get_mod_time(self, path: str) -> str:
"""
Get last modified time for the file path.
Parameters:
- path: Full path to the remote file
Returns:
Last modification time as string in format YYYYMMDDHHMMSS
Raises:
AirflowException: If no files matching
"""from airflow.providers.sftp.hooks.sftp import SFTPHook
# Initialize hook
hook = SFTPHook(ssh_conn_id='my_sftp_conn')
# Check if file exists
if hook.path_exists('/remote/path/file.txt'):
# Download file
hook.retrieve_file('/remote/path/file.txt', '/local/path/file.txt')
# Get file modification time
mod_time = hook.get_mod_time('/remote/path/file.txt')
print(f"File last modified: {mod_time}")
# Upload file
hook.store_file('/remote/path/new_file.txt', '/local/path/data.txt')
# Clean up connection
hook.close_conn()from airflow.providers.sftp.hooks.sftp import SFTPHook
hook = SFTPHook(ssh_conn_id='my_sftp_conn')
# List directory contents
files = hook.list_directory('/remote/path')
print(f"Found {len(files)} files")
# Get detailed file information
file_info = hook.describe_directory('/remote/path')
for filename, attrs in file_info.items():
print(f"{filename}: {attrs['size']} bytes, {attrs['type']}")
# Create directory with parents
hook.create_directory('/remote/path/new/nested/dir')
# Find files by pattern
csv_files = hook.get_files_by_pattern('/remote/path', '*.csv')
print(f"Found CSV files: {csv_files}")from airflow.providers.sftp.hooks.sftp import SFTPHookAsync
async def async_file_operations():
hook = SFTPHookAsync(sftp_conn_id='my_sftp_conn')
# List files asynchronously
files = await hook.list_directory('/remote/path')
if files:
print(f"Found {len(files)} files")
# Get files matching pattern with attributes
pdf_files = await hook.get_files_and_attrs_by_pattern(
'/remote/path', '*.pdf'
)
for file in pdf_files:
mod_time = await hook.get_mod_time(f'/remote/path/{file.filename}')
print(f"{file.filename}: modified {mod_time}")Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-sftp