SSH File Transfer Protocol (SFTP) provider for Apache Airflow with hooks, operators, sensors, triggers, and decorators for secure file operations
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Sensor components for monitoring file and directory presence, modification times, and pattern matching on SFTP servers. The SFTP sensor provides comprehensive file system monitoring capabilities with support for both blocking and deferrable execution modes.
Main sensor for monitoring file and directory presence on SFTP servers with extensive filtering and condition checking capabilities.
class SFTPSensor(BaseSensorOperator):
"""
Waits for a file or directory to be present on SFTP.
Monitors SFTP locations for file presence, pattern matching, and modification
time conditions. Supports both synchronous polling and asynchronous deferrable
execution for efficient resource utilization.
"""
template_fields: Sequence[str] = ("path", "newer_than")
def __init__(
self,
*,
path: str,
file_pattern: str = "",
newer_than: datetime | str | None = None,
sftp_conn_id: str = "sftp_default",
python_callable: Callable | None = None,
op_args: list | None = None,
op_kwargs: dict[str, Any] | None = None,
deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),
**kwargs,
) -> None:
"""
Initialize SFTP sensor.
Parameters:
- path: Remote file or directory path (templated)
- file_pattern: Pattern for file matching using fnmatch format
- sftp_conn_id: Connection to run the sensor against (default: sftp_default)
- newer_than: DateTime for which file should be newer than (templated)
- python_callable: Optional callable to execute when files are found
- op_args: Arguments for python_callable
- op_kwargs: Keyword arguments for python_callable
- deferrable: Whether to defer the task until done (default: False)
"""
def poke(self, context: Context) -> PokeReturnValue | bool:
"""Check for file existence and conditions."""
def execute(self, context: Context) -> Any:
"""Execute the sensor, either synchronously or by deferring to trigger."""
def execute_complete(self, context: dict[str, Any], event: Any = None) -> None:
"""Execute callback when the trigger fires; returns immediately."""def poke(self, context: Context) -> PokeReturnValue | bool:
"""
Check file conditions and return status.
Performs the core sensing logic including file existence checks,
pattern matching, modification time comparisons, and optional
python callable execution.
Parameters:
- context: Airflow task execution context
Returns:
PokeReturnValue with completion status and XCom values, or boolean
indicating whether conditions are met
"""def execute(self, context: Context) -> Any:
"""
Execute the sensor, either synchronously or by deferring to trigger.
When deferrable=False, uses traditional polling approach.
When deferrable=True, defers to SFTPTrigger for async monitoring.
Parameters:
- context: Airflow task execution context
Returns:
Sensor result or defers to trigger for async execution
"""
def execute_complete(self, context: dict[str, Any], event: Any = None) -> None:
"""
Execute callback when the trigger fires.
Called when deferrable sensor completes via trigger.
Processes trigger results and returns immediately.
Parameters:
- context: Airflow task execution context
- event: Event data from the trigger
"""from airflow import DAG
from airflow.providers.sftp.sensors.sftp import SFTPSensor
from datetime import datetime, timedelta
dag = DAG(
'sftp_sensor_basic',
start_date=datetime(2023, 1, 1),
schedule_interval=timedelta(hours=1)
)
# Wait for a specific file to appear
wait_for_file = SFTPSensor(
task_id='wait_for_data_file',
path='/remote/incoming/data.csv',
sftp_conn_id='sftp_default',
timeout=3600, # Wait up to 1 hour
poke_interval=300, # Check every 5 minutes
dag=dag
)from airflow import DAG
from airflow.providers.sftp.sensors.sftp import SFTPSensor
from datetime import datetime, timedelta
dag = DAG(
'sftp_sensor_pattern',
start_date=datetime(2023, 1, 1),
schedule_interval=timedelta(days=1)
)
# Wait for any CSV file matching a pattern
wait_for_csv_files = SFTPSensor(
task_id='wait_for_csv_files',
path='/remote/incoming',
file_pattern='daily_report_*.csv', # Match files like daily_report_20230101.csv
sftp_conn_id='sftp_default',
timeout=7200, # Wait up to 2 hours
poke_interval=600, # Check every 10 minutes
dag=dag
)
# Wait for files with date patterns
wait_for_dated_files = SFTPSensor(
task_id='wait_for_dated_files',
path='/remote/exports',
file_pattern='export_{{ ds_nodash }}_*.json', # Templated pattern
sftp_conn_id='sftp_default',
dag=dag
)from airflow import DAG
from airflow.providers.sftp.sensors.sftp import SFTPSensor
from datetime import datetime, timedelta
dag = DAG(
'sftp_sensor_mod_time',
start_date=datetime(2023, 1, 1),
schedule_interval=timedelta(hours=6)
)
# Wait for file newer than a specific time
wait_for_recent_file = SFTPSensor(
task_id='wait_for_recent_file',
path='/remote/data/latest.csv',
newer_than='2023-01-01T00:00:00', # ISO format string
sftp_conn_id='sftp_default',
dag=dag
)
# Wait for file newer than task execution time
wait_for_fresh_file = SFTPSensor(
task_id='wait_for_fresh_file',
path='/remote/data/hourly.json',
newer_than='{{ ts }}', # Templated to task execution time
sftp_conn_id='sftp_default',
dag=dag
)
# Wait for file newer than yesterday
wait_for_daily_update = SFTPSensor(
task_id='wait_for_daily_update',
path='/remote/reports',
file_pattern='daily_*.csv',
newer_than='{{ yesterday_ds }}T00:00:00', # Yesterday at midnight
sftp_conn_id='sftp_default',
dag=dag
)from airflow import DAG
from airflow.providers.sftp.sensors.sftp import SFTPSensor
from datetime import datetime, timedelta
def process_found_files(files_found, **context):
"""Custom processing function for found files."""
print(f"Found {len(files_found)} files: {files_found}")
# Custom logic for file validation
for file_path in files_found:
print(f"Processing: {file_path}")
# Add custom validation or processing logic
return {"processed_files": len(files_found), "status": "success"}
dag = DAG(
'sftp_sensor_callable',
start_date=datetime(2023, 1, 1),
schedule_interval=timedelta(hours=2)
)
sensor_with_processing = SFTPSensor(
task_id='sensor_with_processing',
path='/remote/incoming',
file_pattern='*.xml',
sftp_conn_id='sftp_default',
python_callable=process_found_files,
op_kwargs={'extra_param': 'custom_value'},
dag=dag
)from airflow import DAG
from airflow.providers.sftp.sensors.sftp import SFTPSensor
from datetime import datetime, timedelta
dag = DAG(
'sftp_sensor_deferrable',
start_date=datetime(2023, 1, 1),
schedule_interval=timedelta(hours=1)
)
# Use deferrable sensor to free up worker slots while waiting
deferrable_sensor = SFTPSensor(
task_id='deferrable_file_sensor',
path='/remote/large_files',
file_pattern='bigdata_*.parquet',
sftp_conn_id='sftp_default',
deferrable=True, # Use async trigger instead of blocking
timeout=14400, # Wait up to 4 hours
dag=dag
)from airflow import DAG
from airflow.providers.sftp.sensors.sftp import SFTPSensor
from airflow.providers.sftp.operators.sftp import SFTPOperator, SFTPOperation
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
def validate_files(**context):
"""Validate downloaded files before processing."""
files_found = context['task_instance'].xcom_pull(task_ids='wait_for_source_files')
print(f"Validating files: {files_found}")
# Add validation logic
return True
dag = DAG(
'sftp_complex_monitoring',
start_date=datetime(2023, 1, 1),
schedule_interval=timedelta(days=1)
)
# Wait for multiple files with specific pattern and recency
wait_for_source_files = SFTPSensor(
task_id='wait_for_source_files',
path='/remote/daily_exports',
file_pattern='export_{{ ds_nodash }}_*.csv',
newer_than='{{ ds }}T06:00:00', # Files newer than 6 AM on execution date
sftp_conn_id='sftp_source',
timeout=10800, # 3 hours timeout
poke_interval=900, # Check every 15 minutes
dag=dag
)
# Download files once they're available
download_files = SFTPOperator(
task_id='download_files',
ssh_conn_id='sftp_source',
local_filepath='/local/staging/{{ ds }}/',
remote_filepath='/remote/daily_exports/export_{{ ds_nodash }}_*.csv',
operation=SFTPOperation.GET,
create_intermediate_dirs=True,
dag=dag
)
# Validate downloaded files
validate = PythonOperator(
task_id='validate_files',
python_callable=validate_files,
dag=dag
)
# Wait for processing completion signal
wait_for_completion = SFTPSensor(
task_id='wait_for_completion',
path='/remote/status/processing_complete_{{ ds_nodash }}.flag',
sftp_conn_id='sftp_source',
timeout=7200, # 2 hours for processing
dag=dag
)
wait_for_source_files >> download_files >> validate >> wait_for_completionfrom airflow import DAG
from airflow.providers.sftp.sensors.sftp import SFTPSensor
from datetime import datetime, timedelta
dag = DAG(
'sftp_directory_monitoring',
start_date=datetime(2023, 1, 1),
schedule_interval=timedelta(hours=4)
)
# Monitor for any file in a directory
wait_for_any_file = SFTPSensor(
task_id='wait_for_any_file',
path='/remote/incoming',
file_pattern='*', # Match any file
sftp_conn_id='sftp_default',
dag=dag
)
# Monitor for specific file types
wait_for_json_files = SFTPSensor(
task_id='wait_for_json_files',
path='/remote/api_exports',
file_pattern='*.json',
newer_than='{{ ds }}T00:00:00', # Today's files only
sftp_conn_id='sftp_default',
dag=dag
)from airflow import DAG
from airflow.providers.sftp.sensors.sftp import SFTPSensor
from airflow.operators.email import EmailOperator
from datetime import datetime, timedelta
default_args = {
'retries': 2,
'retry_delay': timedelta(minutes=10),
'email_on_failure': True,
'email_on_retry': False
}
dag = DAG(
'sftp_sensor_monitoring',
default_args=default_args,
start_date=datetime(2023, 1, 1),
schedule_interval=timedelta(hours=1)
)
# Critical file monitoring with failure notifications
critical_file_sensor = SFTPSensor(
task_id='critical_file_sensor',
path='/remote/critical/daily_feed.csv',
newer_than='{{ ds }}T07:00:00', # Must be from today after 7 AM
sftp_conn_id='sftp_critical',
timeout=7200, # 2 hour timeout
poke_interval=300, # Check every 5 minutes
dag=dag
)
# Send alert if sensor fails
failure_alert = EmailOperator(
task_id='failure_alert',
to=['ops@company.com'],
subject='Critical SFTP File Missing - {{ ds }}',
html_content='''
<h3>Alert: Critical SFTP File Missing</h3>
<p>The daily feed file was not found within the expected timeframe.</p>
<p>Execution Date: {{ ds }}</p>
<p>Please check the SFTP server and data pipeline.</p>
''',
trigger_rule='one_failed', # Trigger on sensor failure
dag=dag
)
critical_file_sensor >> failure_alertpoke_interval values to balance responsiveness with server loadtimeout values based on expected file arrival patternsdeferrable=True for long-running sensors to free up worker slotsInstall with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-sftp