CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-sftp

SSH File Transfer Protocol (SFTP) provider for Apache Airflow with hooks, operators, sensors, triggers, and decorators for secure file operations

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

operators.mddocs/

File Transfer Operations

Task execution components for uploading and downloading files between local and remote SFTP locations. The SFTP operator provides robust file transfer capabilities with support for batch operations, intermediate directory creation, and comprehensive error handling.

Capabilities

SFTP Operation Constants

Operation type constants for specifying the direction of file transfers.

class SFTPOperation:
    """Operation that can be used with SFTP."""
    
    PUT = "put"  # Upload operation from local to remote
    GET = "get"  # Download operation from remote to local

SFTP Operator

Main operator for transferring files between local and remote SFTP locations.

class SFTPOperator(BaseOperator):
    """
    SFTPOperator for transferring files from remote host to local or vice versa.
    
    This operator uses SFTP hook to open SFTP transport channel that serves as
    basis for file transfer operations. Supports both single file and batch
    file transfers with comprehensive configuration options.
    """
    
    template_fields: Sequence[str] = ("local_filepath", "remote_filepath", "remote_host")
    
    def __init__(
        self,
        *,
        ssh_hook: SSHHook | None = None,
        sftp_hook: SFTPHook | None = None,
        ssh_conn_id: str | None = None,
        remote_host: str | None = None,
        local_filepath: str | list[str],
        remote_filepath: str | list[str],
        operation: str = SFTPOperation.PUT,
        confirm: bool = True,
        create_intermediate_dirs: bool = False,
        **kwargs,
    ) -> None:
        """
        Initialize SFTP operator.
        
        Parameters:
        - ssh_conn_id: SSH connection ID from Airflow connections
        - sftp_hook: Predefined SFTPHook to use (preferred over ssh_conn_id)
        - ssh_hook: Deprecated - predefined SSHHook to use (use sftp_hook instead)
        - remote_host: Remote host to connect (templated)
        - local_filepath: Local file path or list of paths to get or put (templated)
        - remote_filepath: Remote file path or list of paths to get or put (templated)
        - operation: Specify operation 'get' or 'put' (default: put)
        - confirm: Specify if SFTP operation should be confirmed (default: True)
        - create_intermediate_dirs: Create missing intermediate directories (default: False)
        """
    
    def execute(self, context: Any) -> str | list[str] | None:
        """Execute the file transfer operation."""
    
    def get_openlineage_facets_on_start(self):
        """Return OpenLineage datasets for lineage tracking."""

File Transfer Execution

def execute(self, context: Any) -> str | list[str] | None:
    """
    Execute the file transfer operation (PUT or GET).
    
    Validates file path arrays, establishes SFTP connection, and performs
    the specified transfer operation with optional intermediate directory creation.
    
    Parameters:
    - context: Airflow task execution context
    
    Returns:
    Filepath or list of filepaths that were transferred, or None
    
    Raises:
    ValueError: If local_filepath and remote_filepath arrays have different lengths
    TypeError: If operation is not 'get' or 'put'
    AirflowException: If both ssh_hook and sftp_hook are defined, or transfer fails
    """

Data Lineage Integration

def get_openlineage_facets_on_start(self):
    """
    Return OpenLineage datasets for data lineage tracking.
    
    Creates dataset facets for both local and remote file locations
    to enable lineage tracking in OpenLineage-compatible systems.
    
    Returns:
    OpenLineage facets containing input and output datasets
    """

Usage Examples

Basic File Upload

from airflow import DAG
from airflow.providers.sftp.operators.sftp import SFTPOperator, SFTPOperation
from datetime import datetime

dag = DAG(
    'sftp_upload_example',
    start_date=datetime(2023, 1, 1),
    schedule_interval=None
)

upload_task = SFTPOperator(
    task_id='upload_file',
    ssh_conn_id='sftp_default',
    local_filepath='/local/data/report.csv',
    remote_filepath='/remote/uploads/report.csv',
    operation=SFTPOperation.PUT,
    dag=dag
)

Basic File Download

from airflow import DAG
from airflow.providers.sftp.operators.sftp import SFTPOperator, SFTPOperation
from datetime import datetime

dag = DAG(
    'sftp_download_example',
    start_date=datetime(2023, 1, 1),
    schedule_interval=None
)

download_task = SFTPOperator(
    task_id='download_file',
    ssh_conn_id='sftp_default',
    local_filepath='/local/downloads/data.csv',
    remote_filepath='/remote/exports/data.csv',
    operation=SFTPOperation.GET,
    dag=dag
)

Batch File Transfer

from airflow import DAG
from airflow.providers.sftp.operators.sftp import SFTPOperator, SFTPOperation
from datetime import datetime, timedelta

dag = DAG(
    'sftp_batch_example',
    start_date=datetime(2023, 1, 1),
    schedule_interval=timedelta(days=1)
)

# Upload multiple files
batch_upload = SFTPOperator(
    task_id='batch_upload',
    ssh_conn_id='sftp_default',
    local_filepath=[
        '/local/data/file1.csv',
        '/local/data/file2.csv',
        '/local/data/file3.csv'
    ],
    remote_filepath=[
        '/remote/uploads/file1.csv',
        '/remote/uploads/file2.csv',
        '/remote/uploads/file3.csv'
    ],
    operation=SFTPOperation.PUT,
    create_intermediate_dirs=True,  # Create /remote/uploads/ if it doesn't exist
    dag=dag
)

# Download multiple files
batch_download = SFTPOperator(
    task_id='batch_download',
    ssh_conn_id='sftp_default',
    local_filepath=[
        '/local/downloads/result1.json',
        '/local/downloads/result2.json'
    ],
    remote_filepath=[
        '/remote/results/result1.json',
        '/remote/results/result2.json'
    ],
    operation=SFTPOperation.GET,
    dag=dag
)

Advanced Configuration

from airflow import DAG
from airflow.providers.sftp.operators.sftp import SFTPOperator, SFTPOperation
from airflow.providers.sftp.hooks.sftp import SFTPHook
from datetime import datetime

dag = DAG(
    'sftp_advanced_example',
    start_date=datetime(2023, 1, 1),
    schedule_interval=None
)

# Using predefined hook for custom configuration
custom_hook = SFTPHook(ssh_conn_id='sftp_custom')

advanced_transfer = SFTPOperator(
    task_id='advanced_transfer',
    sftp_hook=custom_hook,  # Use predefined hook
    remote_host='custom.sftp.server.com',  # Override connection host
    local_filepath='/local/data/{{ ds }}/report.csv',  # Templated path
    remote_filepath='/remote/daily/{{ ds }}/report.csv',  # Templated path
    operation=SFTPOperation.PUT,
    confirm=True,  # Confirm successful transfer
    create_intermediate_dirs=True,  # Create date-based directories
    dag=dag
)

Error Handling and Retries

from airflow import DAG
from airflow.providers.sftp.operators.sftp import SFTPOperator, SFTPOperation
from datetime import datetime, timedelta

default_args = {
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'retry_exponential_backoff': True,
    'max_retry_delay': timedelta(minutes=30)
}

dag = DAG(
    'sftp_resilient_example',
    default_args=default_args,
    start_date=datetime(2023, 1, 1),
    schedule_interval=timedelta(hours=6)
)

resilient_transfer = SFTPOperator(
    task_id='resilient_transfer',
    ssh_conn_id='sftp_prod',
    local_filepath='/local/critical/data.parquet',
    remote_filepath='/remote/warehouse/data.parquet',
    operation=SFTPOperation.PUT,
    create_intermediate_dirs=True,
    # Operator will automatically retry on connection or transfer failures
    dag=dag
)

Integration with Sensors

from airflow import DAG
from airflow.providers.sftp.operators.sftp import SFTPOperator, SFTPOperation
from airflow.providers.sftp.sensors.sftp import SFTPSensor
from datetime import datetime, timedelta

dag = DAG(
    'sftp_sensor_integration',
    start_date=datetime(2023, 1, 1),
    schedule_interval=timedelta(hours=1)
)

# Wait for source file to appear
wait_for_file = SFTPSensor(
    task_id='wait_for_source',
    path='/remote/incoming/data.csv',
    sftp_conn_id='sftp_source',
    timeout=3600,  # Wait up to 1 hour
    poke_interval=300,  # Check every 5 minutes
    dag=dag
)

# Download the file once it's available
download_file = SFTPOperator(
    task_id='download_data',
    ssh_conn_id='sftp_source',
    local_filepath='/local/staging/data.csv',
    remote_filepath='/remote/incoming/data.csv',
    operation=SFTPOperation.GET,
    dag=dag
)

# Upload processed file to different server
upload_processed = SFTPOperator(
    task_id='upload_processed',
    ssh_conn_id='sftp_destination',
    local_filepath='/local/processed/data.csv',
    remote_filepath='/remote/processed/data.csv',
    operation=SFTPOperation.PUT,
    create_intermediate_dirs=True,
    dag=dag
)

wait_for_file >> download_file >> upload_processed

Dynamic File Paths with Templating

from airflow import DAG
from airflow.providers.sftp.operators.sftp import SFTPOperator, SFTPOperation
from datetime import datetime, timedelta

dag = DAG(
    'sftp_templating_example',
    start_date=datetime(2023, 1, 1),
    schedule_interval=timedelta(days=1)
)

templated_transfer = SFTPOperator(
    task_id='templated_transfer',
    ssh_conn_id='sftp_default',
    # Use Airflow templating for dynamic paths
    local_filepath='/local/data/{{ ds }}/export_{{ ts_nodash }}.csv',
    remote_filepath='/remote/daily/{{ ds }}/export_{{ ts_nodash }}.csv',
    operation=SFTPOperation.PUT,
    create_intermediate_dirs=True,
    dag=dag
)

Best Practices

Connection Management

  • Use connection pooling through Airflow's connection management
  • Prefer sftp_hook parameter over deprecated ssh_hook
  • Use connection IDs consistently across tasks
  • Configure timeouts appropriately for large file transfers

File Path Handling

  • Use absolute paths for both local and remote file paths
  • Enable create_intermediate_dirs=True for dynamic directory structures
  • Validate file path arrays have matching lengths for batch operations
  • Use templating for date-based or dynamic file naming

Error Handling

  • Configure appropriate retry policies for network-dependent operations
  • Use confirm=True to verify successful transfers
  • Implement downstream validation of transferred files
  • Monitor transfer logs for performance optimization opportunities

Performance Optimization

  • Use batch operations for multiple files instead of individual tasks
  • Consider file size limitations for single transfers
  • Implement prefetching controls for large file downloads
  • Use async hooks for I/O intensive workflows where appropriate

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-sftp

docs

decorators.md

hooks.md

index.md

operators.md

sensors.md

triggers.md

tile.json