Provider package for Apache Airflow that enables FTP file transfer protocol operations including hooks, operators, and sensors for workflow integration.
84
Sensor classes for waiting and monitoring file or directory availability on FTP servers with configurable error handling, retry logic, and support for both standard FTP and secure FTPS protocols.
Primary sensor for monitoring file or directory presence on FTP servers with intelligent error handling and transient error recovery.
class FTPSensor(BaseSensorOperator):
"""
Wait for file or directory to be present on FTP server.
Template Fields: ("path",)
Parameters:
- path (str): Remote file or directory path to monitor
- ftp_conn_id (str): FTP connection ID (default: "ftp_default")
- fail_on_transient_errors (bool): Fail on transient errors (default: True)
"""
template_fields: Sequence[str] = ("path",)
# Transient FTP error codes that can be retried
transient_errors = [421, 425, 426, 434, 450, 451, 452]
error_code_pattern = re.compile(r"\d+")
def __init__(
self,
*,
path: str,
ftp_conn_id: str = "ftp_default",
fail_on_transient_errors: bool = True,
**kwargs
) -> None: ...
def poke(self, context: Context) -> bool:
"""
Check if file or directory exists on FTP server.
Parameters:
- context (Context): Airflow task context
Returns:
bool: True if file/directory exists, False otherwise
"""
def _create_hook(self) -> FTPHook:
"""
Create and return FTPHook instance.
Returns:
FTPHook: Configured FTP hook
"""
def _get_error_code(self, e) -> int | Exception:
"""
Extract numeric error code from FTP exception.
Parameters:
- e (Exception): FTP exception
Returns:
int | Exception: Extracted error code or original exception
"""Secure sensor for monitoring files on FTPS servers with SSL/TLS encryption support.
class FTPSSensor(FTPSensor):
"""
Wait for file or directory to be present on FTPS server.
Inherits all FTPSensor functionality with SSL/TLS encryption support.
"""
def _create_hook(self) -> FTPHook:
"""
Create and return FTPSHook instance.
Returns:
FTPHook: Configured FTPS hook with SSL/TLS support
"""from airflow import DAG
from airflow.providers.ftp.sensors.ftp import FTPSensor
from datetime import datetime, timedelta
dag = DAG('ftp_monitoring_example', start_date=datetime(2023, 1, 1))
# Wait for daily data file to appear
wait_for_data = FTPSensor(
task_id='wait_for_daily_data',
path='/remote/data/daily_report_{{ ds }}.csv', # Templated path
ftp_conn_id='my_ftp_connection',
poke_interval=60, # Check every minute
timeout=3600, # Timeout after 1 hour
mode='poke', # Blocking mode
dag=dag
)from airflow.providers.ftp.sensors.ftp import FTPSensor
# Wait for any file to appear in directory
wait_for_directory = FTPSensor(
task_id='wait_for_directory_content',
path='/remote/inbox/', # Monitor directory
ftp_conn_id='my_ftp_connection',
poke_interval=300, # Check every 5 minutes
timeout=7200, # Timeout after 2 hours
dag=dag
)from airflow.providers.ftp.sensors.ftp import FTPSSensor
# Monitor secure FTP server
wait_for_secure_file = FTPSSensor(
task_id='wait_for_secure_data',
path='/secure/confidential/data.xml',
ftp_conn_id='my_secure_ftp_connection', # FTPS connection
poke_interval=120, # Check every 2 minutes
timeout=1800, # Timeout after 30 minutes
dag=dag
)from airflow.providers.ftp.sensors.ftp import FTPSensor
# Handle transient errors gracefully
resilient_sensor = FTPSensor(
task_id='resilient_file_monitor',
path='/remote/unreliable_source/data.txt',
ftp_conn_id='unreliable_ftp',
fail_on_transient_errors=False, # Don't fail on transient errors
poke_interval=180, # Check every 3 minutes
timeout=10800, # Extended timeout (3 hours)
retries=3, # Retry on permanent failures
retry_delay=timedelta(minutes=10),
dag=dag
)from airflow.providers.ftp.sensors.ftp import FTPSensor
# Use reschedule mode to free up worker slots
long_running_sensor = FTPSensor(
task_id='long_running_file_monitor',
path='/remote/batch_data/weekly_export.zip',
ftp_conn_id='batch_ftp',
poke_interval=1800, # Check every 30 minutes
timeout=604800, # Timeout after 1 week
mode='reschedule', # Non-blocking mode
dag=dag
)from airflow import DAG
from airflow.providers.ftp.sensors.ftp import FTPSensor, FTPSSensor
from airflow.providers.ftp.operators.ftp import FTPFileTransmitOperator, FTPOperation
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
def validate_file():
"""Validate downloaded file format and content."""
print("Validating file format and content...")
# File validation logic here
return True
def notify_completion():
"""Send notification about successful processing."""
print("Sending completion notification...")
# Notification logic here
dag = DAG(
'comprehensive_ftp_monitoring',
start_date=datetime(2023, 1, 1),
schedule_interval=timedelta(hours=6), # Run every 6 hours
catchup=False
)
# Monitor multiple sources simultaneously
wait_for_source1 = FTPSensor(
task_id='wait_for_source1_data',
path='/source1/data/{{ ds }}/export.csv',
ftp_conn_id='source1_ftp',
poke_interval=300,
timeout=3600,
dag=dag
)
wait_for_source2 = FTPSSensor( # Secure source
task_id='wait_for_source2_data',
path='/secure/source2/{{ ds }}/sensitive_data.xml',
ftp_conn_id='secure_ftp',
poke_interval=300,
timeout=3600,
dag=dag
)
# Download files once available
download_source1 = FTPFileTransmitOperator(
task_id='download_source1',
ftp_conn_id='source1_ftp',
remote_filepath='/source1/data/{{ ds }}/export.csv',
local_filepath='/local/staging/source1_{{ ds }}.csv',
operation=FTPOperation.GET,
create_intermediate_dirs=True,
dag=dag
)
download_source2 = FTPFileTransmitOperator(
task_id='download_source2',
ftp_conn_id='secure_ftp',
remote_filepath='/secure/source2/{{ ds }}/sensitive_data.xml',
local_filepath='/local/staging/source2_{{ ds }}.xml',
operation=FTPOperation.GET,
create_intermediate_dirs=True,
dag=dag
)
# Validate downloaded files
validate_files = PythonOperator(
task_id='validate_files',
python_callable=validate_file,
dag=dag
)
# Send completion notification
notify = PythonOperator(
task_id='notify_completion',
python_callable=notify_completion,
dag=dag
)
# Define task dependencies
[wait_for_source1, wait_for_source2] >> [download_source1, download_source2]
[download_source1, download_source2] >> validate_files >> notifyThe sensor automatically handles these transient FTP error codes by retrying rather than failing:
fail_on_transient_errors parameter:
True (default): Raises exception, fails taskFalse: Returns False, continues pokingfail_on_transient_errors=False for unreliable FTP serverspoke_interval to balance responsiveness and server loadmode='reschedule' for long-running sensors to free worker slotsThe path parameter supports Airflow templating:
# Template examples
path='/data/{{ ds }}/daily_report.csv' # Execution date
path='/data/{{ macros.ds_add(ds, -1) }}/file.txt' # Previous day
path='/hourly/{{ ts_nodash_with_tz[:10] }}/data.csv' # Hour-based pathsSensors use Airflow's connection pooling and automatic cleanup, ensuring efficient resource utilization across multiple concurrent monitoring tasks.
Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-ftpevals
scenario-1
scenario-2
scenario-3
scenario-4
scenario-5
scenario-6
scenario-7
scenario-8
scenario-9
scenario-10