CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-sftp

SSH File Transfer Protocol (SFTP) provider for Apache Airflow with hooks, operators, sensors, triggers, and decorators for secure file operations

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

hooks.mddocs/

SFTP Hooks

Core connectivity and file system operations for SFTP servers. The SFTP provider includes both synchronous and asynchronous hooks to support different operational patterns within Airflow workflows.

Capabilities

Synchronous SFTP Hook

Main hook for SFTP operations using paramiko, providing comprehensive file system operations and connection management.

class SFTPHook(SSHHook):
    """
    Interact with SFTP servers using paramiko.
    
    Inherits from SSHHook and provides SFTP-specific functionality.
    Supports connection pooling, authentication via SSH keys or passwords,
    and comprehensive file system operations.
    """
    
    conn_name_attr = "ssh_conn_id"
    default_conn_name = "sftp_default"
    conn_type = "sftp"
    hook_name = "SFTP"
    
    def __init__(
        self,
        ssh_conn_id: str | None = "sftp_default",
        ssh_hook: SSHHook | None = None,
        *args,
        **kwargs,
    ) -> None:
        """
        Initialize SFTP hook.
        
        Parameters:
        - ssh_conn_id: Connection ID from Airflow connections
        - ssh_hook: Optional existing SSH hook (deprecated)
        """
    
    def get_conn(self) -> paramiko.SFTPClient:
        """Open an SFTP connection to the remote host."""
    
    def close_conn(self) -> None:
        """Close the SFTP connection."""
    
    def test_connection(self) -> tuple[bool, str]:
        """Test the SFTP connection by calling path with directory."""

UI Configuration

@classmethod
def get_ui_field_behaviour(cls) -> dict[str, Any]:
    """
    Get UI field behavior configuration for Airflow connections.
    
    Provides configuration for hiding/relabeling connection form fields
    in the Airflow web UI when creating SFTP connections.
    
    Returns:
    Dictionary containing UI field configuration with hidden fields
    and field relabeling specifications
    """

Connection Management

def get_conn(self) -> paramiko.SFTPClient:
    """
    Open an SFTP connection to the remote host.
    
    Returns:
    paramiko.SFTPClient instance for SFTP operations
    """

def close_conn(self) -> None:
    """Close the SFTP connection and cleanup resources."""

def test_connection(self) -> tuple[bool, str]:
    """
    Test the SFTP connection.
    
    Returns:
    Tuple of (success: bool, message: str)
    """

Directory Operations

def describe_directory(self, path: str) -> dict[str, dict[str, str | int | None]]:
    """
    Get file information in a directory on the remote system.
    
    Parameters:
    - path: Full path to the remote directory
    
    Returns:
    Dictionary mapping filenames to their attributes (size, type, modify)
    """

def list_directory(self, path: str) -> list[str]:
    """
    List files in a directory on the remote system.
    
    Parameters:
    - path: Full path to the remote directory to list
    
    Returns:
    Sorted list of filenames
    """

def mkdir(self, path: str, mode: int = 0o777) -> None:
    """
    Create a directory on the remote system.
    
    Parameters:
    - path: Full path to the remote directory to create
    - mode: Int permissions of octal mode for directory (default: 0o777)
    """

def create_directory(self, path: str, mode: int = 0o777) -> None:
    """
    Create a directory on the remote system with parent directories.
    
    Creates parent directories if needed and returns silently if target exists.
    
    Parameters:
    - path: Full path to the remote directory to create
    - mode: Int permissions of octal mode for directory (default: 0o777)
    """

def delete_directory(self, path: str) -> None:
    """
    Delete a directory on the remote system.
    
    Parameters:
    - path: Full path to the remote directory to delete
    """

File Operations

def retrieve_file(self, remote_full_path: str, local_full_path: str, prefetch: bool = True) -> None:
    """
    Transfer the remote file to a local location.
    
    Parameters:
    - remote_full_path: Full path to the remote file
    - local_full_path: Full path to the local file
    - prefetch: Controls whether prefetch is performed (default: True)
    """

def store_file(self, remote_full_path: str, local_full_path: str, confirm: bool = True) -> None:
    """
    Transfer a local file to the remote location.
    
    Parameters:
    - remote_full_path: Full path to the remote file
    - local_full_path: Full path to the local file
    - confirm: Whether to confirm the transfer (default: True)
    """

def delete_file(self, path: str) -> None:
    """
    Remove a file on the server.
    
    Parameters:
    - path: Full path to the remote file
    """

def get_mod_time(self, path: str) -> str:
    """
    Get an entry's modification time.
    
    Parameters:
    - path: Full path to the remote file
    
    Returns:
    Modification time as string in format YYYYMMDDHHMMSS
    """

Path Utilities

def isdir(self, path: str) -> bool:
    """
    Check if the path provided is a directory.
    
    Parameters:
    - path: Full path to the remote directory to check
    
    Returns:
    True if path is a directory, False otherwise
    """

def isfile(self, path: str) -> bool:
    """
    Check if the path provided is a file.
    
    Parameters:
    - path: Full path to the remote file to check
    
    Returns:
    True if path is a file, False otherwise
    """

def path_exists(self, path: str) -> bool:
    """
    Whether a remote entity exists.
    
    Parameters:
    - path: Full path to the remote file or directory
    
    Returns:
    True if path exists, False otherwise
    """

Pattern Matching

def get_file_by_pattern(self, path, fnmatch_pattern) -> str:
    """
    Get the first matching file based on the given fnmatch type pattern.
    
    Parameters:
    - path: Path to be checked
    - fnmatch_pattern: The pattern that will be matched with fnmatch
    
    Returns:
    String containing the first found file, or empty string if none matched
    """

def get_files_by_pattern(self, path, fnmatch_pattern) -> list[str]:
    """
    Get all matching files based on the given fnmatch type pattern.
    
    Parameters:
    - path: Path to be checked  
    - fnmatch_pattern: The pattern that will be matched with fnmatch
    
    Returns:
    List of strings containing the found files, or empty list if none matched
    """

@staticmethod
def _is_path_match(path: str, prefix: str | None = None, delimiter: str | None = None) -> bool:
    """
    Whether given path starts with prefix (if set) and ends with delimiter (if set).
    
    Internal utility method used by get_tree_map for path filtering.
    
    Parameters:
    - path: Path to be checked
    - prefix: If set, path will be checked if starting with prefix
    - delimiter: If set, path will be checked if ending with delimiter
    
    Returns:
    True if path matches criteria, False otherwise
    """

Tree Walking

def walktree(
    self,
    path: str,
    fcallback: Callable[[str], Any | None],
    dcallback: Callable[[str], Any | None],
    ucallback: Callable[[str], Any | None],
    recurse: bool = True,
) -> None:
    """
    Recursively descend, depth first, the directory tree at path.
    
    Calls discrete callback functions for each regular file, directory,
    and unknown file type.
    
    Parameters:
    - path: Root of remote directory to descend, use '.' to start at pwd
    - fcallback: Callback function to invoke for a regular file
    - dcallback: Callback function to invoke for a directory
    - ucallback: Callback function to invoke for an unknown file type
    - recurse: Should it recurse (default: True)
    """

def get_tree_map(
    self, path: str, prefix: str | None = None, delimiter: str | None = None
) -> tuple[list[str], list[str], list[str]]:
    """
    Get tuple with recursive lists of files, directories and unknown paths.
    
    Can filter results by giving prefix and/or delimiter parameters.
    
    Parameters:
    - path: Path from which tree will be built
    - prefix: If set, paths will be added if they start with prefix
    - delimiter: If set, paths will be added if they end with delimiter
    
    Returns:
    Tuple with list of files, dirs and unknown items
    """

Asynchronous SFTP Hook

Async hook for SFTP operations using asyncssh, designed for high-performance asynchronous operations.

class SFTPHookAsync(BaseHook):
    """
    Interact with an SFTP server via asyncssh package.
    
    Provides asynchronous SFTP operations for improved performance
    in concurrent scenarios and deferrable operations.
    """
    
    conn_name_attr = "ssh_conn_id"
    default_conn_name = "sftp_default"
    conn_type = "sftp"
    hook_name = "SFTP"
    default_known_hosts = "~/.ssh/known_hosts"
    
    def __init__(
        self,
        sftp_conn_id: str = default_conn_name,
        host: str = "",
        port: int = 22,
        username: str = "",
        password: str = "",
        known_hosts: str = default_known_hosts,
        key_file: str = "",
        passphrase: str = "",
        private_key: str = "",
    ) -> None:
        """
        Initialize async SFTP hook.
        
        Parameters:
        - sftp_conn_id: SFTP connection ID to be used for connecting to SFTP server
        - host: Hostname of the SFTP server
        - port: Port of the SFTP server (default: 22)
        - username: Username used when authenticating to the SFTP server
        - password: Password used when authenticating to the SFTP server
        - known_hosts: Path to the known_hosts file (default: ~/.ssh/known_hosts)
        - key_file: Path to the client key file used for authentication
        - passphrase: Passphrase used with the key_file for authentication
        - private_key: Private key content as string
        """

Asynchronous Operations

async def list_directory(self, path: str = "") -> list[str] | None:
    """
    Return a list of files on the SFTP server at the provided path.
    
    Parameters:
    - path: Path to list (default: current directory)
    
    Returns:
    Sorted list of filenames, or None if path doesn't exist
    """

async def read_directory(self, path: str = "") -> Sequence[asyncssh.sftp.SFTPName] | None:
    """
    Return a list of files along with their attributes on the SFTP server.
    
    Parameters:
    - path: Path to list (default: current directory)
    
    Returns:
    Sequence of SFTPName objects with file attributes, or None if path doesn't exist
    """

async def get_files_and_attrs_by_pattern(
    self, path: str = "", fnmatch_pattern: str = ""
) -> Sequence[asyncssh.sftp.SFTPName]:
    """
    Get the files along with their attributes matching the pattern.
    
    Parameters:
    - path: Path to search in
    - fnmatch_pattern: Pattern to match (e.g., "*.pdf")
    
    Returns:
    Sequence of SFTPName objects for files matching the pattern
    
    Raises:
    FileNotFoundError: If no files at path found
    """

async def get_mod_time(self, path: str) -> str:
    """
    Get last modified time for the file path.
    
    Parameters:
    - path: Full path to the remote file
    
    Returns:
    Last modification time as string in format YYYYMMDDHHMMSS
    
    Raises:
    AirflowException: If no files matching
    """

Usage Examples

Basic File Operations

from airflow.providers.sftp.hooks.sftp import SFTPHook

# Initialize hook
hook = SFTPHook(ssh_conn_id='my_sftp_conn')

# Check if file exists
if hook.path_exists('/remote/path/file.txt'):
    # Download file
    hook.retrieve_file('/remote/path/file.txt', '/local/path/file.txt')
    
    # Get file modification time
    mod_time = hook.get_mod_time('/remote/path/file.txt')
    print(f"File last modified: {mod_time}")

# Upload file
hook.store_file('/remote/path/new_file.txt', '/local/path/data.txt')

# Clean up connection
hook.close_conn()

Directory Operations

from airflow.providers.sftp.hooks.sftp import SFTPHook

hook = SFTPHook(ssh_conn_id='my_sftp_conn')

# List directory contents
files = hook.list_directory('/remote/path')
print(f"Found {len(files)} files")

# Get detailed file information
file_info = hook.describe_directory('/remote/path')
for filename, attrs in file_info.items():
    print(f"{filename}: {attrs['size']} bytes, {attrs['type']}")

# Create directory with parents
hook.create_directory('/remote/path/new/nested/dir')

# Find files by pattern
csv_files = hook.get_files_by_pattern('/remote/path', '*.csv')
print(f"Found CSV files: {csv_files}")

Asynchronous Operations

from airflow.providers.sftp.hooks.sftp import SFTPHookAsync

async def async_file_operations():
    hook = SFTPHookAsync(sftp_conn_id='my_sftp_conn')
    
    # List files asynchronously
    files = await hook.list_directory('/remote/path')
    if files:
        print(f"Found {len(files)} files")
    
    # Get files matching pattern with attributes
    pdf_files = await hook.get_files_and_attrs_by_pattern(
        '/remote/path', '*.pdf'
    )
    
    for file in pdf_files:
        mod_time = await hook.get_mod_time(f'/remote/path/{file.filename}')
        print(f"{file.filename}: modified {mod_time}")

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-sftp

docs

decorators.md

hooks.md

index.md

operators.md

sensors.md

triggers.md

tile.json