CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-sftp

SSH File Transfer Protocol (SFTP) provider for Apache Airflow with hooks, operators, sensors, triggers, and decorators for secure file operations

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

sensors.mddocs/

File Monitoring

Sensor components for monitoring file and directory presence, modification times, and pattern matching on SFTP servers. The SFTP sensor provides comprehensive file system monitoring capabilities with support for both blocking and deferrable execution modes.

Capabilities

SFTP Sensor

Main sensor for monitoring file and directory presence on SFTP servers with extensive filtering and condition checking capabilities.

class SFTPSensor(BaseSensorOperator):
    """
    Waits for a file or directory to be present on SFTP.
    
    Monitors SFTP locations for file presence, pattern matching, and modification
    time conditions. Supports both synchronous polling and asynchronous deferrable
    execution for efficient resource utilization.
    """
    
    template_fields: Sequence[str] = ("path", "newer_than")
    
    def __init__(
        self,
        *,
        path: str,
        file_pattern: str = "",
        newer_than: datetime | str | None = None,
        sftp_conn_id: str = "sftp_default",
        python_callable: Callable | None = None,
        op_args: list | None = None,
        op_kwargs: dict[str, Any] | None = None,
        deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),
        **kwargs,
    ) -> None:
        """
        Initialize SFTP sensor.
        
        Parameters:
        - path: Remote file or directory path (templated)
        - file_pattern: Pattern for file matching using fnmatch format
        - sftp_conn_id: Connection to run the sensor against (default: sftp_default)
        - newer_than: DateTime for which file should be newer than (templated)
        - python_callable: Optional callable to execute when files are found
        - op_args: Arguments for python_callable
        - op_kwargs: Keyword arguments for python_callable
        - deferrable: Whether to defer the task until done (default: False)
        """
    
    def poke(self, context: Context) -> PokeReturnValue | bool:
        """Check for file existence and conditions."""
    
    def execute(self, context: Context) -> Any:
        """Execute the sensor, either synchronously or by deferring to trigger."""
    
    def execute_complete(self, context: dict[str, Any], event: Any = None) -> None:
        """Execute callback when the trigger fires; returns immediately."""

File Existence Monitoring

def poke(self, context: Context) -> PokeReturnValue | bool:
    """
    Check file conditions and return status.
    
    Performs the core sensing logic including file existence checks,
    pattern matching, modification time comparisons, and optional
    python callable execution.
    
    Parameters:
    - context: Airflow task execution context
    
    Returns:
    PokeReturnValue with completion status and XCom values, or boolean
    indicating whether conditions are met
    """

Synchronous and Asynchronous Execution

def execute(self, context: Context) -> Any:
    """
    Execute the sensor, either synchronously or by deferring to trigger.
    
    When deferrable=False, uses traditional polling approach.
    When deferrable=True, defers to SFTPTrigger for async monitoring.
    
    Parameters:
    - context: Airflow task execution context
    
    Returns:
    Sensor result or defers to trigger for async execution
    """

def execute_complete(self, context: dict[str, Any], event: Any = None) -> None:
    """
    Execute callback when the trigger fires.
    
    Called when deferrable sensor completes via trigger.
    Processes trigger results and returns immediately.
    
    Parameters:
    - context: Airflow task execution context
    - event: Event data from the trigger
    """

Usage Examples

Basic File Monitoring

from airflow import DAG
from airflow.providers.sftp.sensors.sftp import SFTPSensor
from datetime import datetime, timedelta

dag = DAG(
    'sftp_sensor_basic',
    start_date=datetime(2023, 1, 1),
    schedule_interval=timedelta(hours=1)
)

# Wait for a specific file to appear
wait_for_file = SFTPSensor(
    task_id='wait_for_data_file',
    path='/remote/incoming/data.csv',
    sftp_conn_id='sftp_default',
    timeout=3600,  # Wait up to 1 hour
    poke_interval=300,  # Check every 5 minutes
    dag=dag
)

Pattern-Based File Monitoring

from airflow import DAG
from airflow.providers.sftp.sensors.sftp import SFTPSensor
from datetime import datetime, timedelta

dag = DAG(
    'sftp_sensor_pattern',
    start_date=datetime(2023, 1, 1),
    schedule_interval=timedelta(days=1)
)

# Wait for any CSV file matching a pattern
wait_for_csv_files = SFTPSensor(
    task_id='wait_for_csv_files',
    path='/remote/incoming',
    file_pattern='daily_report_*.csv',  # Match files like daily_report_20230101.csv
    sftp_conn_id='sftp_default',
    timeout=7200,  # Wait up to 2 hours
    poke_interval=600,  # Check every 10 minutes
    dag=dag
)

# Wait for files with date patterns
wait_for_dated_files = SFTPSensor(
    task_id='wait_for_dated_files',
    path='/remote/exports',
    file_pattern='export_{{ ds_nodash }}_*.json',  # Templated pattern
    sftp_conn_id='sftp_default',
    dag=dag
)

Modification Time Monitoring

from airflow import DAG
from airflow.providers.sftp.sensors.sftp import SFTPSensor
from datetime import datetime, timedelta

dag = DAG(
    'sftp_sensor_mod_time',
    start_date=datetime(2023, 1, 1),
    schedule_interval=timedelta(hours=6)
)

# Wait for file newer than a specific time
wait_for_recent_file = SFTPSensor(
    task_id='wait_for_recent_file',
    path='/remote/data/latest.csv',
    newer_than='2023-01-01T00:00:00',  # ISO format string
    sftp_conn_id='sftp_default',
    dag=dag
)

# Wait for file newer than task execution time
wait_for_fresh_file = SFTPSensor(
    task_id='wait_for_fresh_file',
    path='/remote/data/hourly.json',
    newer_than='{{ ts }}',  # Templated to task execution time
    sftp_conn_id='sftp_default',
    dag=dag
)

# Wait for file newer than yesterday
wait_for_daily_update = SFTPSensor(
    task_id='wait_for_daily_update',
    path='/remote/reports',
    file_pattern='daily_*.csv',
    newer_than='{{ yesterday_ds }}T00:00:00',  # Yesterday at midnight
    sftp_conn_id='sftp_default',
    dag=dag
)

Custom Processing with Python Callable

from airflow import DAG
from airflow.providers.sftp.sensors.sftp import SFTPSensor
from datetime import datetime, timedelta

def process_found_files(files_found, **context):
    """Custom processing function for found files."""
    print(f"Found {len(files_found)} files: {files_found}")
    
    # Custom logic for file validation
    for file_path in files_found:
        print(f"Processing: {file_path}")
        # Add custom validation or processing logic
    
    return {"processed_files": len(files_found), "status": "success"}

dag = DAG(
    'sftp_sensor_callable',
    start_date=datetime(2023, 1, 1),
    schedule_interval=timedelta(hours=2)
)

sensor_with_processing = SFTPSensor(
    task_id='sensor_with_processing',
    path='/remote/incoming',
    file_pattern='*.xml',
    sftp_conn_id='sftp_default',
    python_callable=process_found_files,
    op_kwargs={'extra_param': 'custom_value'},
    dag=dag
)

Deferrable Sensor for Resource Efficiency

from airflow import DAG
from airflow.providers.sftp.sensors.sftp import SFTPSensor
from datetime import datetime, timedelta

dag = DAG(
    'sftp_sensor_deferrable',
    start_date=datetime(2023, 1, 1),
    schedule_interval=timedelta(hours=1)
)

# Use deferrable sensor to free up worker slots while waiting
deferrable_sensor = SFTPSensor(
    task_id='deferrable_file_sensor',
    path='/remote/large_files',
    file_pattern='bigdata_*.parquet',
    sftp_conn_id='sftp_default',
    deferrable=True,  # Use async trigger instead of blocking
    timeout=14400,  # Wait up to 4 hours
    dag=dag
)

Complex File Monitoring Workflow

from airflow import DAG
from airflow.providers.sftp.sensors.sftp import SFTPSensor
from airflow.providers.sftp.operators.sftp import SFTPOperator, SFTPOperation
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

def validate_files(**context):
    """Validate downloaded files before processing."""
    files_found = context['task_instance'].xcom_pull(task_ids='wait_for_source_files')
    print(f"Validating files: {files_found}")
    # Add validation logic
    return True

dag = DAG(
    'sftp_complex_monitoring',
    start_date=datetime(2023, 1, 1),
    schedule_interval=timedelta(days=1)
)

# Wait for multiple files with specific pattern and recency
wait_for_source_files = SFTPSensor(
    task_id='wait_for_source_files',
    path='/remote/daily_exports',
    file_pattern='export_{{ ds_nodash }}_*.csv',
    newer_than='{{ ds }}T06:00:00',  # Files newer than 6 AM on execution date
    sftp_conn_id='sftp_source',
    timeout=10800,  # 3 hours timeout
    poke_interval=900,  # Check every 15 minutes
    dag=dag
)

# Download files once they're available
download_files = SFTPOperator(
    task_id='download_files',
    ssh_conn_id='sftp_source',
    local_filepath='/local/staging/{{ ds }}/',
    remote_filepath='/remote/daily_exports/export_{{ ds_nodash }}_*.csv',
    operation=SFTPOperation.GET,
    create_intermediate_dirs=True,
    dag=dag
)

# Validate downloaded files
validate = PythonOperator(
    task_id='validate_files',
    python_callable=validate_files,
    dag=dag
)

# Wait for processing completion signal
wait_for_completion = SFTPSensor(
    task_id='wait_for_completion',
    path='/remote/status/processing_complete_{{ ds_nodash }}.flag',
    sftp_conn_id='sftp_source',
    timeout=7200,  # 2 hours for processing
    dag=dag
)

wait_for_source_files >> download_files >> validate >> wait_for_completion

Directory Monitoring

from airflow import DAG
from airflow.providers.sftp.sensors.sftp import SFTPSensor
from datetime import datetime, timedelta

dag = DAG(
    'sftp_directory_monitoring',
    start_date=datetime(2023, 1, 1),
    schedule_interval=timedelta(hours=4)
)

# Monitor for any file in a directory
wait_for_any_file = SFTPSensor(
    task_id='wait_for_any_file',
    path='/remote/incoming',
    file_pattern='*',  # Match any file
    sftp_conn_id='sftp_default',
    dag=dag
)

# Monitor for specific file types
wait_for_json_files = SFTPSensor(
    task_id='wait_for_json_files',
    path='/remote/api_exports',
    file_pattern='*.json',
    newer_than='{{ ds }}T00:00:00',  # Today's files only
    sftp_conn_id='sftp_default',
    dag=dag
)

Error Handling and Monitoring

from airflow import DAG
from airflow.providers.sftp.sensors.sftp import SFTPSensor
from airflow.operators.email import EmailOperator
from datetime import datetime, timedelta

default_args = {
    'retries': 2,
    'retry_delay': timedelta(minutes=10),
    'email_on_failure': True,
    'email_on_retry': False
}

dag = DAG(
    'sftp_sensor_monitoring',
    default_args=default_args,
    start_date=datetime(2023, 1, 1),
    schedule_interval=timedelta(hours=1)
)

# Critical file monitoring with failure notifications
critical_file_sensor = SFTPSensor(
    task_id='critical_file_sensor',
    path='/remote/critical/daily_feed.csv',
    newer_than='{{ ds }}T07:00:00',  # Must be from today after 7 AM
    sftp_conn_id='sftp_critical',
    timeout=7200,  # 2 hour timeout
    poke_interval=300,  # Check every 5 minutes
    dag=dag
)

# Send alert if sensor fails
failure_alert = EmailOperator(
    task_id='failure_alert',
    to=['ops@company.com'],
    subject='Critical SFTP File Missing - {{ ds }}',
    html_content='''
    <h3>Alert: Critical SFTP File Missing</h3>
    <p>The daily feed file was not found within the expected timeframe.</p>
    <p>Execution Date: {{ ds }}</p>
    <p>Please check the SFTP server and data pipeline.</p>
    ''',
    trigger_rule='one_failed',  # Trigger on sensor failure
    dag=dag
)

critical_file_sensor >> failure_alert

Best Practices

Performance Optimization

  • Use appropriate poke_interval values to balance responsiveness with server load
  • Set reasonable timeout values based on expected file arrival patterns
  • Use deferrable=True for long-running sensors to free up worker slots
  • Consider file pattern specificity to reduce unnecessary checks

Resource Management

  • Configure sensor pools to limit concurrent SFTP connections
  • Use connection pooling for sensors monitoring the same SFTP server
  • Monitor sensor task duration and adjust timeouts accordingly
  • Implement sensor retries with exponential backoff for transient failures

Monitoring and Alerting

  • Set up alerts for sensor timeout failures
  • Monitor sensor execution patterns to optimize scheduling
  • Use XCom to pass file information to downstream tasks
  • Implement custom logging for sensor status tracking

Pattern Matching

  • Use specific patterns to avoid false positives
  • Test fnmatch patterns thoroughly with expected file names
  • Consider using templated patterns for date-based file monitoring
  • Document pattern expectations for team maintenance

Time-based Conditions

  • Use UTC timestamps consistently across all sensors
  • Account for timezone differences between Airflow and SFTP servers
  • Implement buffer times for file processing delays
  • Consider file system timestamp precision limitations

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-sftp

docs

decorators.md

hooks.md

index.md

operators.md

sensors.md

triggers.md

tile.json