CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-sftp

SSH File Transfer Protocol (SFTP) provider for Apache Airflow with hooks, operators, sensors, triggers, and decorators for secure file operations

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

triggers.mddocs/

Asynchronous Triggers

Trigger components for deferrable SFTP operations that enable efficient resource utilization by yielding control during long-running file monitoring operations. The SFTP trigger provides asynchronous file system monitoring capabilities for high-performance workflows.

Capabilities

SFTP Trigger

Asynchronous trigger for deferrable SFTP file monitoring operations.

class SFTPTrigger(BaseTrigger):
    """
    SFTPTrigger that fires when file conditions are met on SFTP server.
    
    Provides asynchronous monitoring of SFTP locations for file presence,
    pattern matching, and modification time conditions. Designed for use
    with deferrable sensors to optimize resource utilization.
    """
    
    def __init__(
        self,
        path: str,
        file_pattern: str = "",
        sftp_conn_id: str = "sftp_default",
        newer_than: datetime | str | None = None,
        poke_interval: float = 5,
    ) -> None:
        """
        Initialize SFTP trigger.
        
        Parameters:
        - path: Path on SFTP server to search for files
        - file_pattern: Pattern to match against file list using fnmatch
        - sftp_conn_id: SFTP connection ID for connecting to server
        - newer_than: DateTime threshold for file modification time filtering
        - poke_interval: How often, in seconds, to check for file existence
        """
    
    def serialize(self) -> tuple[str, dict[str, Any]]:
        """Serialize SFTPTrigger arguments and classpath."""
    
    async def run(self) -> AsyncIterator[TriggerEvent]:
        """Make asynchronous calls to SFTP server and yield trigger events."""

Trigger Serialization

def serialize(self) -> tuple[str, dict[str, Any]]:
    """
    Serialize SFTPTrigger arguments and classpath.
    
    Required for trigger persistence and recovery across Airflow restarts.
    Returns the trigger class path and initialization parameters.
    
    Returns:
    Tuple containing:
    - Class path string for trigger reconstruction
    - Dictionary of initialization parameters
    """

Asynchronous Monitoring

async def run(self) -> AsyncIterator[TriggerEvent]:
    """
    Make asynchronous calls to SFTP server and yield trigger events.
    
    Continuously monitors SFTP server for file conditions using SFTPHookAsync.
    Handles different monitoring scenarios:
    - Direct file path monitoring when no pattern is specified
    - Pattern-based file matching when file_pattern is provided
    - Modification time filtering when newer_than is specified
    
    Yields:
    TriggerEvent objects indicating success/failure and found files
    
    Raises:
    AirflowException: For connection failures or configuration errors
    """

Usage Examples

Basic Deferrable File Monitoring

from airflow import DAG
from airflow.providers.sftp.sensors.sftp import SFTPSensor
from datetime import datetime, timedelta

dag = DAG(
    'sftp_deferrable_basic',
    start_date=datetime(2023, 1, 1),
    schedule_interval=timedelta(hours=1)
)

# Deferrable sensor automatically uses SFTPTrigger
deferrable_sensor = SFTPSensor(
    task_id='wait_for_file',
    path='/remote/data/important_file.csv',
    sftp_conn_id='sftp_default',
    deferrable=True,  # Automatically uses SFTPTrigger
    timeout=3600,  # 1 hour timeout
    dag=dag
)

Pattern-Based Monitoring with Triggers

from airflow import DAG
from airflow.providers.sftp.sensors.sftp import SFTPSensor
from datetime import datetime, timedelta

dag = DAG(
    'sftp_deferrable_pattern',
    start_date=datetime(2023, 1, 1),
    schedule_interval=timedelta(days=1)
)

# Monitor for pattern-matched files asynchronously
pattern_sensor = SFTPSensor(
    task_id='wait_for_daily_files',
    path='/remote/daily_exports',
    file_pattern='export_{{ ds_nodash }}_*.json',
    sftp_conn_id='sftp_default',
    deferrable=True,  # Uses SFTPTrigger internally
    timeout=14400,  # 4 hour timeout
    dag=dag
)

Custom Trigger Usage (Advanced)

from airflow import DAG
from airflow.providers.sftp.triggers.sftp import SFTPTrigger
from airflow.sensors.base import BaseSensorOperator
from airflow.triggers.base import TriggerEvent
from datetime import datetime, timedelta

class CustomSFTPSensor(BaseSensorOperator):
    """Custom sensor using SFTPTrigger directly."""
    
    def __init__(self, sftp_path, sftp_conn_id='sftp_default', **kwargs):
        super().__init__(**kwargs)
        self.sftp_path = sftp_path
        self.sftp_conn_id = sftp_conn_id
    
    def execute(self, context):
        """Defer to custom trigger configuration."""
        self.defer(
            trigger=SFTPTrigger(
                path=self.sftp_path,
                file_pattern="*.csv",
                sftp_conn_id=self.sftp_conn_id,
                poke_interval=10.0,  # Custom interval
            ),
            method_name="execute_complete"
        )
    
    def execute_complete(self, context, event=None):
        """Handle trigger completion."""
        if event["status"] == "success":
            self.log.info(f"Files found: {event['files']}")
            return event["files"]
        else:
            raise Exception(f"Trigger failed: {event}")

dag = DAG(
    'custom_sftp_trigger',
    start_date=datetime(2023, 1, 1),
    schedule_interval=timedelta(hours=2)
)

custom_sensor = CustomSFTPSensor(
    task_id='custom_sftp_monitor',
    sftp_path='/remote/custom_data',
    sftp_conn_id='sftp_custom',
    dag=dag
)

High-Frequency Monitoring

from airflow import DAG
from airflow.providers.sftp.sensors.sftp import SFTPSensor
from datetime import datetime, timedelta

dag = DAG(
    'sftp_high_frequency',
    start_date=datetime(2023, 1, 1),
    schedule_interval=timedelta(minutes=15),
    max_active_runs=5  # Allow multiple concurrent runs
)

# High-frequency monitoring with short poke intervals
high_freq_sensor = SFTPSensor(
    task_id='high_freq_monitor',
    path='/remote/realtime_data',
    file_pattern='realtime_*.json',
    newer_than='{{ ts }}',  # Only files newer than task start
    sftp_conn_id='sftp_realtime',
    deferrable=True,
    timeout=900,  # 15 minutes timeout
    # Note: poke_interval is configured in the trigger
    dag=dag
)

Multiple File Pattern Monitoring

from airflow import DAG
from airflow.providers.sftp.sensors.sftp import SFTPSensor
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

def process_multiple_patterns(**context):
    """Process results from multiple pattern sensors."""
    csv_files = context['task_instance'].xcom_pull(task_ids='wait_for_csv')
    json_files = context['task_instance'].xcom_pull(task_ids='wait_for_json')
    xml_files = context['task_instance'].xcom_pull(task_ids='wait_for_xml')
    
    all_files = []
    if csv_files: all_files.extend(csv_files.get('files_found', []))
    if json_files: all_files.extend(json_files.get('files_found', []))
    if xml_files: all_files.extend(xml_files.get('files_found', []))
    
    print(f"Found {len(all_files)} total files to process")
    return all_files

dag = DAG(
    'sftp_multiple_patterns',
    start_date=datetime(2023, 1, 1),
    schedule_interval=timedelta(hours=6)
)

# Monitor for CSV files
csv_sensor = SFTPSensor(
    task_id='wait_for_csv',
    path='/remote/exports',
    file_pattern='*.csv',
    sftp_conn_id='sftp_default',
    deferrable=True,
    dag=dag
)

# Monitor for JSON files
json_sensor = SFTPSensor(
    task_id='wait_for_json',
    path='/remote/exports',
    file_pattern='*.json',
    sftp_conn_id='sftp_default',
    deferrable=True,
    dag=dag
)

# Monitor for XML files
xml_sensor = SFTPSensor(
    task_id='wait_for_xml',
    path='/remote/exports',
    file_pattern='*.xml',
    sftp_conn_id='sftp_default',
    deferrable=True,
    dag=dag
)

# Process all found files
process_files = PythonOperator(
    task_id='process_all_files',
    python_callable=process_multiple_patterns,
    dag=dag
)

[csv_sensor, json_sensor, xml_sensor] >> process_files

Time-Based Trigger Monitoring

from airflow import DAG
from airflow.providers.sftp.sensors.sftp import SFTPSensor
from datetime import datetime, timedelta

dag = DAG(
    'sftp_time_based_trigger',
    start_date=datetime(2023, 1, 1),
    schedule_interval=timedelta(days=1)
)

# Wait for files newer than specific time with deferrable execution
time_based_sensor = SFTPSensor(
    task_id='wait_for_recent_files',
    path='/remote/time_sensitive',
    file_pattern='data_*.parquet',
    newer_than='{{ ds }}T08:00:00',  # Files from today after 8 AM
    sftp_conn_id='sftp_default',
    deferrable=True,
    timeout=28800,  # 8 hour timeout
    dag=dag
)

Error Handling with Triggers

from airflow import DAG
from airflow.providers.sftp.sensors.sftp import SFTPSensor
from airflow.operators.python import PythonOperator
from airflow.operators.email import EmailOperator
from datetime import datetime, timedelta

def handle_sensor_success(**context):
    """Handle successful file detection."""
    task_instance = context['task_instance']
    sensor_result = task_instance.xcom_pull(task_ids='deferrable_file_sensor')
    
    if isinstance(sensor_result, dict) and 'files_found' in sensor_result:
        files = sensor_result['files_found']
        print(f"Successfully found {len(files)} files: {files}")
        return {"status": "success", "file_count": len(files)}
    else:
        print(f"Sensor completed with result: {sensor_result}")
        return {"status": "completed", "result": sensor_result}

dag = DAG(
    'sftp_trigger_error_handling',
    start_date=datetime(2023, 1, 1),
    schedule_interval=timedelta(hours=4)
)

# Deferrable sensor with comprehensive error handling
deferrable_sensor = SFTPSensor(
    task_id='deferrable_file_sensor',
    path='/remote/critical_data',
    file_pattern='critical_*.csv',
    newer_than='{{ ds }}T00:00:00',
    sftp_conn_id='sftp_critical',
    deferrable=True,
    timeout=14400,  # 4 hours
    retries=2,
    retry_delay=timedelta(minutes=30),
    dag=dag
)

# Handle successful completion
success_handler = PythonOperator(
    task_id='handle_success',
    python_callable=handle_sensor_success,
    dag=dag
)

# Send failure notification
failure_notification = EmailOperator(
    task_id='failure_notification',
    to=['ops@company.com'],
    subject='SFTP Monitoring Failed - {{ ds }}',
    html_content='''
    <h3>SFTP Trigger Monitoring Failure</h3>
    <p>The deferrable SFTP sensor failed to find required files.</p>
    <p>Task: {{ task.task_id }}</p>
    <p>Execution Date: {{ ds }}</p>
    ''',
    trigger_rule='one_failed',
    dag=dag
)

deferrable_sensor >> success_handler
deferrable_sensor >> failure_notification

Resource-Efficient Batch Monitoring

from airflow import DAG
from airflow.providers.sftp.sensors.sftp import SFTPSensor
from datetime import datetime, timedelta

# Configure for resource efficiency
dag = DAG(
    'sftp_batch_efficient',
    start_date=datetime(2023, 1, 1),
    schedule_interval=timedelta(hours=1),
    max_active_runs=10,  # Allow many concurrent deferrable tasks
    catchup=False
)

# Multiple sensors running concurrently with minimal resource usage
sensors = []
for i in range(5):
    sensor = SFTPSensor(
        task_id=f'monitor_batch_{i}',
        path=f'/remote/batch_{i}',
        file_pattern='*.csv',
        sftp_conn_id='sftp_default',
        deferrable=True,  # Each sensor uses minimal resources
        timeout=3600,
        dag=dag
    )
    sensors.append(sensor)

# All sensors can run concurrently without consuming worker slots

Trigger Lifecycle

Initialization and Serialization

When a deferrable sensor is executed:

  1. Sensor Execution: The sensor's execute() method calls self.defer()
  2. Trigger Creation: An SFTPTrigger instance is created with the specified parameters
  3. Serialization: The trigger is serialized using serialize() method for persistence
  4. Worker Release: The sensor releases its worker slot and the trigger runs asynchronously

Asynchronous Monitoring Loop

The trigger's run() method:

  1. Connection Setup: Establishes async SFTP connection using SFTPHookAsync
  2. Monitoring Loop: Continuously checks file conditions at specified intervals
  3. Condition Evaluation: Evaluates file existence, patterns, and modification times
  4. Event Generation: Yields TriggerEvent objects when conditions are met or timeouts occur

Completion and Callback

When the trigger completes:

  1. Event Yield: Trigger yields a final TriggerEvent with success/failure status
  2. Sensor Resumption: The sensor's execute_complete() method is called
  3. Result Processing: Sensor processes the trigger event and completes execution

Best Practices

Resource Management

  • Use deferrable sensors for long-running monitoring tasks to free up worker slots
  • Configure appropriate poke_interval values to balance responsiveness with server load
  • Implement reasonable timeout values to prevent indefinite waiting
  • Monitor trigger task queues to ensure adequate triggerer capacity

Performance Optimization

  • Use specific file patterns to reduce unnecessary server queries
  • Implement connection pooling for triggers monitoring the same SFTP server
  • Consider trigger serialization overhead for very high-frequency monitoring
  • Monitor async connection pool sizes for optimal performance

Error Handling

  • Implement appropriate retry strategies for trigger failures
  • Configure alerts for trigger timeout scenarios
  • Use proper exception handling in custom trigger implementations
  • Monitor trigger execution logs for connection issues

Scalability Considerations

  • Plan triggerer capacity based on expected concurrent deferrable tasks
  • Use database connection pooling for trigger state management
  • Implement proper cleanup for failed or interrupted triggers
  • Consider trigger resource limits in high-throughput scenarios

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-sftp

docs

decorators.md

hooks.md

index.md

operators.md

sensors.md

triggers.md

tile.json