CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-sftp

SSH File Transfer Protocol (SFTP) provider for Apache Airflow with hooks, operators, sensors, triggers, and decorators for secure file operations

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

decorators.mddocs/

Task Decorators

Simplified interfaces for creating SFTP-based tasks using Python decorators. The SFTP provider includes task decorators that enable more readable and maintainable DAG definitions for common SFTP operations, particularly file monitoring scenarios.

Capabilities

SFTP Sensor Task Decorator

Task decorator for creating SFTP sensor tasks with simplified syntax and enhanced functionality.

def sftp_sensor_task(python_callable: Callable | None = None, **kwargs) -> TaskDecorator:
    """
    Wrap a function into an Airflow SFTP sensor operator.
    
    Creates a decorated task that combines SFTP file monitoring with custom
    Python processing logic. The decorated function receives files_found
    in its keyword arguments when files are detected.
    
    Parameters:
    - python_callable: Function to decorate and execute when files are found
    - **kwargs: Additional arguments passed to the underlying SFTPSensor
    
    Returns:
    TaskDecorator that creates _DecoratedSFTPSensor instances
    """

Decorated SFTP Sensor Class

Internal implementation class for decorated SFTP sensor tasks.

class _DecoratedSFTPSensor(SFTPSensor):
    """
    Wraps a Python callable and captures args/kwargs when called for execution.
    
    Combines SFTP file monitoring capabilities with custom Python processing.
    Inherits all SFTPSensor functionality while adding decorator-specific
    handling for Python callable execution.
    """
    
    template_fields: Sequence[str] = ("op_args", "op_kwargs", *SFTPSensor.template_fields)
    custom_operator_name = "@task.sftp_sensor"
    shallow_copy_attrs: Sequence[str] = ("python_callable",)
    
    def __init__(
        self,
        *,
        task_id: str,
        **kwargs,
    ) -> None:
        """
        Initialize decorated SFTP sensor.
        
        Parameters:
        - task_id: Unique task identifier
        - **kwargs: Arguments passed to parent SFTPSensor class
        """

Usage Examples

Basic SFTP Sensor Decorator

from airflow import DAG
from airflow.providers.sftp.decorators.sensors.sftp import sftp_sensor_task
from datetime import datetime, timedelta

dag = DAG(
    'sftp_decorator_basic',
    start_date=datetime(2023, 1, 1),
    schedule_interval=timedelta(hours=1)
)

@sftp_sensor_task(
    path='/remote/data',
    file_pattern='*.csv',
    sftp_conn_id='sftp_default',
    dag=dag
)
def process_csv_files(files_found, **context):
    """Process CSV files when they are found."""
    print(f"Found {len(files_found)} CSV files: {files_found}")
    
    # Custom processing logic
    processed_files = []
    for file_path in files_found:
        print(f"Processing file: {file_path}")
        # Add your file processing logic here
        processed_files.append(f"processed_{file_path}")
    
    return {
        "status": "success",
        "processed_count": len(processed_files),
        "processed_files": processed_files
    }

Advanced File Processing with Decorator

from airflow import DAG
from airflow.providers.sftp.decorators.sensors.sftp import sftp_sensor_task
from airflow.providers.sftp.hooks.sftp import SFTPHook
from datetime import datetime, timedelta
import json

dag = DAG(
    'sftp_decorator_advanced',
    start_date=datetime(2023, 1, 1),
    schedule_interval=timedelta(days=1)
)

@sftp_sensor_task(
    path='/remote/daily_exports',
    file_pattern='export_{{ ds_nodash }}_*.json',
    newer_than='{{ ds }}T00:00:00',
    sftp_conn_id='sftp_default',
    timeout=7200,  # 2 hours
    poke_interval=300,  # 5 minutes
    dag=dag
)
def validate_and_download_exports(files_found, **context):
    """Validate JSON exports and download them for processing."""
    hook = SFTPHook(ssh_conn_id='sftp_default')
    
    validated_files = []
    invalid_files = []
    
    for file_path in files_found:
        try:
            # Get file size for validation
            file_info = hook.describe_directory(file_path.rsplit('/', 1)[0])
            filename = file_path.rsplit('/', 1)[1]
            
            if filename in file_info:
                file_size = file_info[filename]['size']
                if file_size > 100:  # Minimum size check
                    validated_files.append(file_path)
                    print(f"Valid file: {file_path} ({file_size} bytes)")
                else:
                    invalid_files.append(file_path)
                    print(f"Invalid file (too small): {file_path} ({file_size} bytes)")
            
        except Exception as e:
            print(f"Error validating {file_path}: {e}")
            invalid_files.append(file_path)
    
    hook.close_conn()
    
    return {
        "valid_files": validated_files,
        "invalid_files": invalid_files,
        "validation_summary": {
            "total_found": len(files_found),
            "valid_count": len(validated_files),
            "invalid_count": len(invalid_files)
        }
    }

Deferrable Sensor with Custom Processing

from airflow import DAG
from airflow.providers.sftp.decorators.sensors.sftp import sftp_sensor_task
from datetime import datetime, timedelta

dag = DAG(
    'sftp_decorator_deferrable',
    start_date=datetime(2023, 1, 1),
    schedule_interval=timedelta(hours=2),
    max_active_runs=5
)

@sftp_sensor_task(
    path='/remote/realtime_data',
    file_pattern='sensor_data_*.parquet',
    newer_than='{{ ts }}',  # Only files newer than task execution time
    sftp_conn_id='sftp_realtime',
    deferrable=True,  # Use async trigger for resource efficiency
    timeout=3600,
    dag=dag
)
def process_sensor_data(files_found, **context):
    """Process real-time sensor data files."""
    execution_date = context['ds']
    task_instance = context['task_instance']
    
    print(f"Processing sensor data for {execution_date}")
    print(f"Found {len(files_found)} files: {files_found}")
    
    # Simulate processing logic
    processing_results = []
    for file_path in files_found:
        # Extract timestamp from filename
        filename = file_path.split('/')[-1]
        if 'sensor_data_' in filename:
            timestamp = filename.replace('sensor_data_', '').replace('.parquet', '')
            processing_results.append({
                "file": file_path,
                "timestamp": timestamp,
                "status": "processed"
            })
    
    # Push results to XCom for downstream tasks
    task_instance.xcom_push(key='processing_results', value=processing_results)
    
    return {
        "execution_date": execution_date,
        "files_processed": len(processing_results),
        "processing_results": processing_results
    }

Multiple Pattern Monitoring with Decorator

from airflow import DAG
from airflow.providers.sftp.decorators.sensors.sftp import sftp_sensor_task
from datetime import datetime, timedelta

dag = DAG(
    'sftp_decorator_patterns',
    start_date=datetime(2023, 1, 1),
    schedule_interval=timedelta(hours=4)
)

@sftp_sensor_task(
    path='/remote/mixed_data',
    file_pattern='*',  # Monitor all files
    sftp_conn_id='sftp_default',
    dag=dag
)
def categorize_files(files_found, **context):
    """Categorize found files by type and process accordingly."""
    
    categorized = {
        'csv_files': [],
        'json_files': [],
        'xml_files': [],
        'other_files': []
    }
    
    for file_path in files_found:
        filename = file_path.lower()
        if filename.endswith('.csv'):
            categorized['csv_files'].append(file_path)
        elif filename.endswith('.json'):
            categorized['json_files'].append(file_path)
        elif filename.endswith('.xml'):
            categorized['xml_files'].append(file_path)
        else:
            categorized['other_files'].append(file_path)
    
    # Log categorization results
    for category, files in categorized.items():
        if files:
            print(f"{category}: {len(files)} files")
            for file in files:
                print(f"  - {file}")
    
    return categorized

Error Handling in Decorated Tasks

from airflow import DAG
from airflow.providers.sftp.decorators.sensors.sftp import sftp_sensor_task
from airflow.exceptions import AirflowException
from datetime import datetime, timedelta

dag = DAG(
    'sftp_decorator_error_handling',
    start_date=datetime(2023, 1, 1),
    schedule_interval=timedelta(hours=6)
)

@sftp_sensor_task(
    path='/remote/critical_files',
    file_pattern='critical_*.txt',
    newer_than='{{ ds }}T06:00:00',
    sftp_conn_id='sftp_critical',
    timeout=7200,
    retries=2,
    retry_delay=timedelta(minutes=15),
    dag=dag
)
def process_critical_files(files_found, **context):
    """Process critical files with comprehensive error handling."""
    
    if not files_found:
        raise AirflowException("No critical files found - this should not happen")
    
    try:
        processed_files = []
        failed_files = []
        
        for file_path in files_found:
            try:
                # Simulate file processing
                print(f"Processing critical file: {file_path}")
                
                # Add your critical file processing logic here
                # For example: data validation, format checking, etc.
                
                # Simulate processing success/failure
                if "invalid" not in file_path.lower():
                    processed_files.append(file_path)
                    print(f"Successfully processed: {file_path}")
                else:
                    failed_files.append(file_path)
                    print(f"Processing failed: {file_path}")
                    
            except Exception as e:
                failed_files.append(file_path)
                print(f"Error processing {file_path}: {e}")
        
        # Check if any critical files failed
        if failed_files:
            error_msg = f"Failed to process {len(failed_files)} critical files: {failed_files}"
            print(error_msg)
            # Decide whether to fail the task or just warn
            if len(failed_files) > len(processed_files):
                raise AirflowException(error_msg)
        
        return {
            "total_files": len(files_found),
            "processed_files": processed_files,
            "failed_files": failed_files,
            "success_rate": len(processed_files) / len(files_found) * 100
        }
        
    except Exception as e:
        print(f"Critical error in file processing: {e}")
        raise AirflowException(f"Critical file processing failed: {e}")

Integration with Downstream Tasks

from airflow import DAG
from airflow.providers.sftp.decorators.sensors.sftp import sftp_sensor_task
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

def process_downstream(**context):
    """Process results from decorated SFTP sensor."""
    # Pull results from the decorated sensor
    sensor_results = context['task_instance'].xcom_pull(task_ids='monitor_data_files')
    
    print(f"Received sensor results: {sensor_results}")
    
    if sensor_results and 'files_found' in sensor_results:
        files = sensor_results['files_found']
        print(f"Processing {len(files)} files downstream")
        
        # Add downstream processing logic
        for file_path in files:
            print(f"Downstream processing: {file_path}")
    
    return "Downstream processing complete"

dag = DAG(
    'sftp_decorator_integration',
    start_date=datetime(2023, 1, 1),
    schedule_interval=timedelta(hours=3)
)

# Decorated sensor task
@sftp_sensor_task(
    task_id='monitor_data_files',
    path='/remote/data_pipeline',
    file_pattern='pipeline_*.csv',
    sftp_conn_id='sftp_default',
    dag=dag
)
def monitor_and_validate(files_found, **context):
    """Monitor files and perform initial validation."""
    validated_files = []
    
    for file_path in files_found:
        # Perform validation logic
        if file_path.endswith('.csv'):
            validated_files.append(file_path)
            print(f"Validated: {file_path}")
    
    return {
        "files_found": files_found,
        "validated_files": validated_files,
        "validation_count": len(validated_files)
    }

# Downstream processing task
downstream_task = PythonOperator(
    task_id='downstream_processing',
    python_callable=process_downstream,
    dag=dag
)

# Set up task dependencies
monitor_and_validate >> downstream_task

Template Usage in Decorators

from airflow import DAG
from airflow.providers.sftp.decorators.sensors.sftp import sftp_sensor_task
from datetime import datetime, timedelta

dag = DAG(
    'sftp_decorator_templating',
    start_date=datetime(2023, 1, 1),
    schedule_interval=timedelta(days=1)
)

@sftp_sensor_task(
    path='/remote/daily/{{ ds }}',  # Templated path
    file_pattern='data_{{ ds_nodash }}_*.json',  # Templated pattern
    newer_than='{{ ds }}T05:00:00',  # Templated time
    sftp_conn_id='sftp_default',
    timeout=14400,
    dag=dag
)
def process_daily_data(files_found, **context):
    """Process daily data files using Airflow templating."""
    execution_date = context['ds']
    formatted_date = context['ds_nodash']
    
    print(f"Processing daily data for {execution_date}")
    print(f"Looking for pattern: data_{formatted_date}_*.json")
    print(f"Found {len(files_found)} files")
    
    daily_summary = {
        "execution_date": execution_date,
        "formatted_date": formatted_date,
        "files_found": files_found,
        "file_count": len(files_found)
    }
    
    # Process each file
    for file_path in files_found:
        print(f"Processing daily file: {file_path}")
        # Add daily file processing logic
    
    return daily_summary

Decorator Benefits

Simplified Syntax

  • Combines sensor logic with custom processing in a single function
  • Reduces boilerplate code compared to separate sensor and processing tasks
  • Provides cleaner DAG definitions with decorator syntax
  • Enables direct access to found files in the decorated function

Enhanced Functionality

  • Automatic handling of files_found parameter injection
  • Seamless integration with Airflow's templating system
  • Built-in XCom handling for downstream task communication
  • Support for all SFTPSensor parameters and configurations

Improved Maintainability

  • Co-locates sensor configuration with processing logic
  • Reduces task dependencies and complex XCom passing
  • Provides clear function signatures for custom processing
  • Enables better code organization and reusability

Best Practices

Function Design

  • Keep decorated functions focused on processing found files
  • Use descriptive function names that indicate the processing purpose
  • Document function parameters and return values clearly
  • Handle edge cases like empty file lists gracefully

Error Handling

  • Implement proper exception handling within decorated functions
  • Use Airflow exceptions for task failures that should stop the pipeline
  • Log processing steps for debugging and monitoring
  • Consider partial failure scenarios for batch file processing

Performance Considerations

  • Avoid heavy processing within the decorated function for large file sets
  • Consider using the decorator for coordination and separate tasks for processing
  • Use appropriate timeout values for sensor configuration
  • Monitor memory usage when processing file metadata

Integration Patterns

  • Use return values to pass results to downstream tasks via XCom
  • Implement consistent return value structures across decorated tasks
  • Consider using the decorator for validation and coordination logic
  • Combine with other Airflow operators for complex workflows

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-sftp

docs

decorators.md

hooks.md

index.md

operators.md

sensors.md

triggers.md

tile.json