CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-ftp

Provider package for Apache Airflow that enables FTP file transfer protocol operations including hooks, operators, and sensors for workflow integration.

84

1.06x
Overview
Eval results
Files

ftp-sensors.mddocs/

File Monitoring

Sensor classes for waiting and monitoring file or directory availability on FTP servers with configurable error handling, retry logic, and support for both standard FTP and secure FTPS protocols.

Capabilities

FTP Sensor

Primary sensor for monitoring file or directory presence on FTP servers with intelligent error handling and transient error recovery.

class FTPSensor(BaseSensorOperator):
    """
    Wait for file or directory to be present on FTP server.
    
    Template Fields: ("path",)
    
    Parameters:
    - path (str): Remote file or directory path to monitor
    - ftp_conn_id (str): FTP connection ID (default: "ftp_default")
    - fail_on_transient_errors (bool): Fail on transient errors (default: True)
    """
    
    template_fields: Sequence[str] = ("path",)
    
    # Transient FTP error codes that can be retried
    transient_errors = [421, 425, 426, 434, 450, 451, 452]
    error_code_pattern = re.compile(r"\d+")
    
    def __init__(
        self,
        *,
        path: str,
        ftp_conn_id: str = "ftp_default",
        fail_on_transient_errors: bool = True,
        **kwargs
    ) -> None: ...
    
    def poke(self, context: Context) -> bool:
        """
        Check if file or directory exists on FTP server.
        
        Parameters:
        - context (Context): Airflow task context
        
        Returns:
        bool: True if file/directory exists, False otherwise
        """
    
    def _create_hook(self) -> FTPHook:
        """
        Create and return FTPHook instance.
        
        Returns:
        FTPHook: Configured FTP hook
        """
    
    def _get_error_code(self, e) -> int | Exception:
        """
        Extract numeric error code from FTP exception.
        
        Parameters:
        - e (Exception): FTP exception
        
        Returns:
        int | Exception: Extracted error code or original exception
        """

FTPS Sensor

Secure sensor for monitoring files on FTPS servers with SSL/TLS encryption support.

class FTPSSensor(FTPSensor):
    """
    Wait for file or directory to be present on FTPS server.
    
    Inherits all FTPSensor functionality with SSL/TLS encryption support.
    """
    
    def _create_hook(self) -> FTPHook:
        """
        Create and return FTPSHook instance.
        
        Returns:
        FTPHook: Configured FTPS hook with SSL/TLS support
        """

Usage Examples

Basic File Monitoring

from airflow import DAG
from airflow.providers.ftp.sensors.ftp import FTPSensor
from datetime import datetime, timedelta

dag = DAG('ftp_monitoring_example', start_date=datetime(2023, 1, 1))

# Wait for daily data file to appear
wait_for_data = FTPSensor(
    task_id='wait_for_daily_data',
    path='/remote/data/daily_report_{{ ds }}.csv',  # Templated path
    ftp_conn_id='my_ftp_connection',
    poke_interval=60,     # Check every minute
    timeout=3600,         # Timeout after 1 hour
    mode='poke',          # Blocking mode
    dag=dag
)

Directory Monitoring

from airflow.providers.ftp.sensors.ftp import FTPSensor

# Wait for any file to appear in directory
wait_for_directory = FTPSensor(
    task_id='wait_for_directory_content',
    path='/remote/inbox/',  # Monitor directory
    ftp_conn_id='my_ftp_connection',
    poke_interval=300,      # Check every 5 minutes
    timeout=7200,           # Timeout after 2 hours
    dag=dag
)

Secure File Monitoring with FTPS

from airflow.providers.ftp.sensors.ftp import FTPSSensor

# Monitor secure FTP server
wait_for_secure_file = FTPSSensor(
    task_id='wait_for_secure_data',
    path='/secure/confidential/data.xml',
    ftp_conn_id='my_secure_ftp_connection',  # FTPS connection
    poke_interval=120,      # Check every 2 minutes
    timeout=1800,           # Timeout after 30 minutes
    dag=dag
)

Advanced Error Handling Configuration

from airflow.providers.ftp.sensors.ftp import FTPSensor

# Handle transient errors gracefully
resilient_sensor = FTPSensor(
    task_id='resilient_file_monitor',
    path='/remote/unreliable_source/data.txt',
    ftp_conn_id='unreliable_ftp',
    fail_on_transient_errors=False,  # Don't fail on transient errors
    poke_interval=180,               # Check every 3 minutes
    timeout=10800,                   # Extended timeout (3 hours)
    retries=3,                       # Retry on permanent failures
    retry_delay=timedelta(minutes=10),
    dag=dag
)

Reschedule Mode for Long-Running Monitoring

from airflow.providers.ftp.sensors.ftp import FTPSensor

# Use reschedule mode to free up worker slots
long_running_sensor = FTPSensor(
    task_id='long_running_file_monitor',
    path='/remote/batch_data/weekly_export.zip',
    ftp_conn_id='batch_ftp',
    poke_interval=1800,     # Check every 30 minutes
    timeout=604800,         # Timeout after 1 week
    mode='reschedule',      # Non-blocking mode
    dag=dag
)

Complete Monitoring Pipeline

from airflow import DAG
from airflow.providers.ftp.sensors.ftp import FTPSensor, FTPSSensor
from airflow.providers.ftp.operators.ftp import FTPFileTransmitOperator, FTPOperation
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

def validate_file():
    """Validate downloaded file format and content."""
    print("Validating file format and content...")
    # File validation logic here
    return True

def notify_completion():
    """Send notification about successful processing."""
    print("Sending completion notification...")
    # Notification logic here

dag = DAG(
    'comprehensive_ftp_monitoring',
    start_date=datetime(2023, 1, 1),
    schedule_interval=timedelta(hours=6),  # Run every 6 hours
    catchup=False
)

# Monitor multiple sources simultaneously
wait_for_source1 = FTPSensor(
    task_id='wait_for_source1_data',
    path='/source1/data/{{ ds }}/export.csv',
    ftp_conn_id='source1_ftp',
    poke_interval=300,
    timeout=3600,
    dag=dag
)

wait_for_source2 = FTPSSensor(  # Secure source
    task_id='wait_for_source2_data',
    path='/secure/source2/{{ ds }}/sensitive_data.xml',
    ftp_conn_id='secure_ftp',
    poke_interval=300,
    timeout=3600,
    dag=dag
)

# Download files once available
download_source1 = FTPFileTransmitOperator(
    task_id='download_source1',
    ftp_conn_id='source1_ftp',
    remote_filepath='/source1/data/{{ ds }}/export.csv',
    local_filepath='/local/staging/source1_{{ ds }}.csv',
    operation=FTPOperation.GET,
    create_intermediate_dirs=True,
    dag=dag
)

download_source2 = FTPFileTransmitOperator(
    task_id='download_source2',
    ftp_conn_id='secure_ftp',
    remote_filepath='/secure/source2/{{ ds }}/sensitive_data.xml',
    local_filepath='/local/staging/source2_{{ ds }}.xml',
    operation=FTPOperation.GET,
    create_intermediate_dirs=True,
    dag=dag
)

# Validate downloaded files
validate_files = PythonOperator(
    task_id='validate_files',
    python_callable=validate_file,
    dag=dag
)

# Send completion notification
notify = PythonOperator(
    task_id='notify_completion',
    python_callable=notify_completion,
    dag=dag
)

# Define task dependencies
[wait_for_source1, wait_for_source2] >> [download_source1, download_source2]
[download_source1, download_source2] >> validate_files >> notify

Error Handling and Recovery

Transient Error Codes

The sensor automatically handles these transient FTP error codes by retrying rather than failing:

  • 421: Service not available, closing control connection
  • 425: Can't open data connection
  • 426: Connection closed; transfer aborted
  • 434: Requested host unavailable
  • 450: Requested file action not taken (file unavailable)
  • 451: Requested action aborted: local error in processing
  • 452: Requested action not taken (insufficient storage)

Error Handling Strategies

  1. Permanent Errors (like 550 - File not found): Sensor returns False and continues poking
  2. Transient Errors: Behavior depends on fail_on_transient_errors parameter:
    • True (default): Raises exception, fails task
    • False: Returns False, continues poking
  3. Connection Errors: Propagated to Airflow for retry handling

Best Practices

  • Use fail_on_transient_errors=False for unreliable FTP servers
  • Set appropriate poke_interval to balance responsiveness and server load
  • Use mode='reschedule' for long-running sensors to free worker slots
  • Configure retries and retry delays for better fault tolerance
  • Monitor sensor logs for patterns in transient errors

Integration with Airflow Features

Templating Support

The path parameter supports Airflow templating:

# Template examples
path='/data/{{ ds }}/daily_report.csv'           # Execution date
path='/data/{{ macros.ds_add(ds, -1) }}/file.txt'  # Previous day
path='/hourly/{{ ts_nodash_with_tz[:10] }}/data.csv'  # Hour-based paths

Sensor Modes

  • Poke Mode (default): Blocks worker slot while waiting
  • Reschedule Mode: Releases worker slot between checks, better for long waits

Connection Management

Sensors use Airflow's connection pooling and automatic cleanup, ensuring efficient resource utilization across multiple concurrent monitoring tasks.

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-ftp

docs

ftp-hooks.md

ftp-operators.md

ftp-sensors.md

index.md

tile.json