SSH File Transfer Protocol (SFTP) provider for Apache Airflow with hooks, operators, sensors, triggers, and decorators for secure file operations
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Trigger components for deferrable SFTP operations that enable efficient resource utilization by yielding control during long-running file monitoring operations. The SFTP trigger provides asynchronous file system monitoring capabilities for high-performance workflows.
Asynchronous trigger for deferrable SFTP file monitoring operations.
class SFTPTrigger(BaseTrigger):
"""
SFTPTrigger that fires when file conditions are met on SFTP server.
Provides asynchronous monitoring of SFTP locations for file presence,
pattern matching, and modification time conditions. Designed for use
with deferrable sensors to optimize resource utilization.
"""
def __init__(
self,
path: str,
file_pattern: str = "",
sftp_conn_id: str = "sftp_default",
newer_than: datetime | str | None = None,
poke_interval: float = 5,
) -> None:
"""
Initialize SFTP trigger.
Parameters:
- path: Path on SFTP server to search for files
- file_pattern: Pattern to match against file list using fnmatch
- sftp_conn_id: SFTP connection ID for connecting to server
- newer_than: DateTime threshold for file modification time filtering
- poke_interval: How often, in seconds, to check for file existence
"""
def serialize(self) -> tuple[str, dict[str, Any]]:
"""Serialize SFTPTrigger arguments and classpath."""
async def run(self) -> AsyncIterator[TriggerEvent]:
"""Make asynchronous calls to SFTP server and yield trigger events."""def serialize(self) -> tuple[str, dict[str, Any]]:
"""
Serialize SFTPTrigger arguments and classpath.
Required for trigger persistence and recovery across Airflow restarts.
Returns the trigger class path and initialization parameters.
Returns:
Tuple containing:
- Class path string for trigger reconstruction
- Dictionary of initialization parameters
"""async def run(self) -> AsyncIterator[TriggerEvent]:
"""
Make asynchronous calls to SFTP server and yield trigger events.
Continuously monitors SFTP server for file conditions using SFTPHookAsync.
Handles different monitoring scenarios:
- Direct file path monitoring when no pattern is specified
- Pattern-based file matching when file_pattern is provided
- Modification time filtering when newer_than is specified
Yields:
TriggerEvent objects indicating success/failure and found files
Raises:
AirflowException: For connection failures or configuration errors
"""from airflow import DAG
from airflow.providers.sftp.sensors.sftp import SFTPSensor
from datetime import datetime, timedelta
dag = DAG(
'sftp_deferrable_basic',
start_date=datetime(2023, 1, 1),
schedule_interval=timedelta(hours=1)
)
# Deferrable sensor automatically uses SFTPTrigger
deferrable_sensor = SFTPSensor(
task_id='wait_for_file',
path='/remote/data/important_file.csv',
sftp_conn_id='sftp_default',
deferrable=True, # Automatically uses SFTPTrigger
timeout=3600, # 1 hour timeout
dag=dag
)from airflow import DAG
from airflow.providers.sftp.sensors.sftp import SFTPSensor
from datetime import datetime, timedelta
dag = DAG(
'sftp_deferrable_pattern',
start_date=datetime(2023, 1, 1),
schedule_interval=timedelta(days=1)
)
# Monitor for pattern-matched files asynchronously
pattern_sensor = SFTPSensor(
task_id='wait_for_daily_files',
path='/remote/daily_exports',
file_pattern='export_{{ ds_nodash }}_*.json',
sftp_conn_id='sftp_default',
deferrable=True, # Uses SFTPTrigger internally
timeout=14400, # 4 hour timeout
dag=dag
)from airflow import DAG
from airflow.providers.sftp.triggers.sftp import SFTPTrigger
from airflow.sensors.base import BaseSensorOperator
from airflow.triggers.base import TriggerEvent
from datetime import datetime, timedelta
class CustomSFTPSensor(BaseSensorOperator):
"""Custom sensor using SFTPTrigger directly."""
def __init__(self, sftp_path, sftp_conn_id='sftp_default', **kwargs):
super().__init__(**kwargs)
self.sftp_path = sftp_path
self.sftp_conn_id = sftp_conn_id
def execute(self, context):
"""Defer to custom trigger configuration."""
self.defer(
trigger=SFTPTrigger(
path=self.sftp_path,
file_pattern="*.csv",
sftp_conn_id=self.sftp_conn_id,
poke_interval=10.0, # Custom interval
),
method_name="execute_complete"
)
def execute_complete(self, context, event=None):
"""Handle trigger completion."""
if event["status"] == "success":
self.log.info(f"Files found: {event['files']}")
return event["files"]
else:
raise Exception(f"Trigger failed: {event}")
dag = DAG(
'custom_sftp_trigger',
start_date=datetime(2023, 1, 1),
schedule_interval=timedelta(hours=2)
)
custom_sensor = CustomSFTPSensor(
task_id='custom_sftp_monitor',
sftp_path='/remote/custom_data',
sftp_conn_id='sftp_custom',
dag=dag
)from airflow import DAG
from airflow.providers.sftp.sensors.sftp import SFTPSensor
from datetime import datetime, timedelta
dag = DAG(
'sftp_high_frequency',
start_date=datetime(2023, 1, 1),
schedule_interval=timedelta(minutes=15),
max_active_runs=5 # Allow multiple concurrent runs
)
# High-frequency monitoring with short poke intervals
high_freq_sensor = SFTPSensor(
task_id='high_freq_monitor',
path='/remote/realtime_data',
file_pattern='realtime_*.json',
newer_than='{{ ts }}', # Only files newer than task start
sftp_conn_id='sftp_realtime',
deferrable=True,
timeout=900, # 15 minutes timeout
# Note: poke_interval is configured in the trigger
dag=dag
)from airflow import DAG
from airflow.providers.sftp.sensors.sftp import SFTPSensor
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
def process_multiple_patterns(**context):
"""Process results from multiple pattern sensors."""
csv_files = context['task_instance'].xcom_pull(task_ids='wait_for_csv')
json_files = context['task_instance'].xcom_pull(task_ids='wait_for_json')
xml_files = context['task_instance'].xcom_pull(task_ids='wait_for_xml')
all_files = []
if csv_files: all_files.extend(csv_files.get('files_found', []))
if json_files: all_files.extend(json_files.get('files_found', []))
if xml_files: all_files.extend(xml_files.get('files_found', []))
print(f"Found {len(all_files)} total files to process")
return all_files
dag = DAG(
'sftp_multiple_patterns',
start_date=datetime(2023, 1, 1),
schedule_interval=timedelta(hours=6)
)
# Monitor for CSV files
csv_sensor = SFTPSensor(
task_id='wait_for_csv',
path='/remote/exports',
file_pattern='*.csv',
sftp_conn_id='sftp_default',
deferrable=True,
dag=dag
)
# Monitor for JSON files
json_sensor = SFTPSensor(
task_id='wait_for_json',
path='/remote/exports',
file_pattern='*.json',
sftp_conn_id='sftp_default',
deferrable=True,
dag=dag
)
# Monitor for XML files
xml_sensor = SFTPSensor(
task_id='wait_for_xml',
path='/remote/exports',
file_pattern='*.xml',
sftp_conn_id='sftp_default',
deferrable=True,
dag=dag
)
# Process all found files
process_files = PythonOperator(
task_id='process_all_files',
python_callable=process_multiple_patterns,
dag=dag
)
[csv_sensor, json_sensor, xml_sensor] >> process_filesfrom airflow import DAG
from airflow.providers.sftp.sensors.sftp import SFTPSensor
from datetime import datetime, timedelta
dag = DAG(
'sftp_time_based_trigger',
start_date=datetime(2023, 1, 1),
schedule_interval=timedelta(days=1)
)
# Wait for files newer than specific time with deferrable execution
time_based_sensor = SFTPSensor(
task_id='wait_for_recent_files',
path='/remote/time_sensitive',
file_pattern='data_*.parquet',
newer_than='{{ ds }}T08:00:00', # Files from today after 8 AM
sftp_conn_id='sftp_default',
deferrable=True,
timeout=28800, # 8 hour timeout
dag=dag
)from airflow import DAG
from airflow.providers.sftp.sensors.sftp import SFTPSensor
from airflow.operators.python import PythonOperator
from airflow.operators.email import EmailOperator
from datetime import datetime, timedelta
def handle_sensor_success(**context):
"""Handle successful file detection."""
task_instance = context['task_instance']
sensor_result = task_instance.xcom_pull(task_ids='deferrable_file_sensor')
if isinstance(sensor_result, dict) and 'files_found' in sensor_result:
files = sensor_result['files_found']
print(f"Successfully found {len(files)} files: {files}")
return {"status": "success", "file_count": len(files)}
else:
print(f"Sensor completed with result: {sensor_result}")
return {"status": "completed", "result": sensor_result}
dag = DAG(
'sftp_trigger_error_handling',
start_date=datetime(2023, 1, 1),
schedule_interval=timedelta(hours=4)
)
# Deferrable sensor with comprehensive error handling
deferrable_sensor = SFTPSensor(
task_id='deferrable_file_sensor',
path='/remote/critical_data',
file_pattern='critical_*.csv',
newer_than='{{ ds }}T00:00:00',
sftp_conn_id='sftp_critical',
deferrable=True,
timeout=14400, # 4 hours
retries=2,
retry_delay=timedelta(minutes=30),
dag=dag
)
# Handle successful completion
success_handler = PythonOperator(
task_id='handle_success',
python_callable=handle_sensor_success,
dag=dag
)
# Send failure notification
failure_notification = EmailOperator(
task_id='failure_notification',
to=['ops@company.com'],
subject='SFTP Monitoring Failed - {{ ds }}',
html_content='''
<h3>SFTP Trigger Monitoring Failure</h3>
<p>The deferrable SFTP sensor failed to find required files.</p>
<p>Task: {{ task.task_id }}</p>
<p>Execution Date: {{ ds }}</p>
''',
trigger_rule='one_failed',
dag=dag
)
deferrable_sensor >> success_handler
deferrable_sensor >> failure_notificationfrom airflow import DAG
from airflow.providers.sftp.sensors.sftp import SFTPSensor
from datetime import datetime, timedelta
# Configure for resource efficiency
dag = DAG(
'sftp_batch_efficient',
start_date=datetime(2023, 1, 1),
schedule_interval=timedelta(hours=1),
max_active_runs=10, # Allow many concurrent deferrable tasks
catchup=False
)
# Multiple sensors running concurrently with minimal resource usage
sensors = []
for i in range(5):
sensor = SFTPSensor(
task_id=f'monitor_batch_{i}',
path=f'/remote/batch_{i}',
file_pattern='*.csv',
sftp_conn_id='sftp_default',
deferrable=True, # Each sensor uses minimal resources
timeout=3600,
dag=dag
)
sensors.append(sensor)
# All sensors can run concurrently without consuming worker slotsWhen a deferrable sensor is executed:
execute() method calls self.defer()SFTPTrigger instance is created with the specified parametersserialize() method for persistenceThe trigger's run() method:
SFTPHookAsyncTriggerEvent objects when conditions are met or timeouts occurWhen the trigger completes:
TriggerEvent with success/failure statusexecute_complete() method is calledpoke_interval values to balance responsiveness with server loadInstall with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-sftp