SSH File Transfer Protocol (SFTP) provider for Apache Airflow with hooks, operators, sensors, triggers, and decorators for secure file operations
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Simplified interfaces for creating SFTP-based tasks using Python decorators. The SFTP provider includes task decorators that enable more readable and maintainable DAG definitions for common SFTP operations, particularly file monitoring scenarios.
Task decorator for creating SFTP sensor tasks with simplified syntax and enhanced functionality.
def sftp_sensor_task(python_callable: Callable | None = None, **kwargs) -> TaskDecorator:
"""
Wrap a function into an Airflow SFTP sensor operator.
Creates a decorated task that combines SFTP file monitoring with custom
Python processing logic. The decorated function receives files_found
in its keyword arguments when files are detected.
Parameters:
- python_callable: Function to decorate and execute when files are found
- **kwargs: Additional arguments passed to the underlying SFTPSensor
Returns:
TaskDecorator that creates _DecoratedSFTPSensor instances
"""Internal implementation class for decorated SFTP sensor tasks.
class _DecoratedSFTPSensor(SFTPSensor):
"""
Wraps a Python callable and captures args/kwargs when called for execution.
Combines SFTP file monitoring capabilities with custom Python processing.
Inherits all SFTPSensor functionality while adding decorator-specific
handling for Python callable execution.
"""
template_fields: Sequence[str] = ("op_args", "op_kwargs", *SFTPSensor.template_fields)
custom_operator_name = "@task.sftp_sensor"
shallow_copy_attrs: Sequence[str] = ("python_callable",)
def __init__(
self,
*,
task_id: str,
**kwargs,
) -> None:
"""
Initialize decorated SFTP sensor.
Parameters:
- task_id: Unique task identifier
- **kwargs: Arguments passed to parent SFTPSensor class
"""from airflow import DAG
from airflow.providers.sftp.decorators.sensors.sftp import sftp_sensor_task
from datetime import datetime, timedelta
dag = DAG(
'sftp_decorator_basic',
start_date=datetime(2023, 1, 1),
schedule_interval=timedelta(hours=1)
)
@sftp_sensor_task(
path='/remote/data',
file_pattern='*.csv',
sftp_conn_id='sftp_default',
dag=dag
)
def process_csv_files(files_found, **context):
"""Process CSV files when they are found."""
print(f"Found {len(files_found)} CSV files: {files_found}")
# Custom processing logic
processed_files = []
for file_path in files_found:
print(f"Processing file: {file_path}")
# Add your file processing logic here
processed_files.append(f"processed_{file_path}")
return {
"status": "success",
"processed_count": len(processed_files),
"processed_files": processed_files
}from airflow import DAG
from airflow.providers.sftp.decorators.sensors.sftp import sftp_sensor_task
from airflow.providers.sftp.hooks.sftp import SFTPHook
from datetime import datetime, timedelta
import json
dag = DAG(
'sftp_decorator_advanced',
start_date=datetime(2023, 1, 1),
schedule_interval=timedelta(days=1)
)
@sftp_sensor_task(
path='/remote/daily_exports',
file_pattern='export_{{ ds_nodash }}_*.json',
newer_than='{{ ds }}T00:00:00',
sftp_conn_id='sftp_default',
timeout=7200, # 2 hours
poke_interval=300, # 5 minutes
dag=dag
)
def validate_and_download_exports(files_found, **context):
"""Validate JSON exports and download them for processing."""
hook = SFTPHook(ssh_conn_id='sftp_default')
validated_files = []
invalid_files = []
for file_path in files_found:
try:
# Get file size for validation
file_info = hook.describe_directory(file_path.rsplit('/', 1)[0])
filename = file_path.rsplit('/', 1)[1]
if filename in file_info:
file_size = file_info[filename]['size']
if file_size > 100: # Minimum size check
validated_files.append(file_path)
print(f"Valid file: {file_path} ({file_size} bytes)")
else:
invalid_files.append(file_path)
print(f"Invalid file (too small): {file_path} ({file_size} bytes)")
except Exception as e:
print(f"Error validating {file_path}: {e}")
invalid_files.append(file_path)
hook.close_conn()
return {
"valid_files": validated_files,
"invalid_files": invalid_files,
"validation_summary": {
"total_found": len(files_found),
"valid_count": len(validated_files),
"invalid_count": len(invalid_files)
}
}from airflow import DAG
from airflow.providers.sftp.decorators.sensors.sftp import sftp_sensor_task
from datetime import datetime, timedelta
dag = DAG(
'sftp_decorator_deferrable',
start_date=datetime(2023, 1, 1),
schedule_interval=timedelta(hours=2),
max_active_runs=5
)
@sftp_sensor_task(
path='/remote/realtime_data',
file_pattern='sensor_data_*.parquet',
newer_than='{{ ts }}', # Only files newer than task execution time
sftp_conn_id='sftp_realtime',
deferrable=True, # Use async trigger for resource efficiency
timeout=3600,
dag=dag
)
def process_sensor_data(files_found, **context):
"""Process real-time sensor data files."""
execution_date = context['ds']
task_instance = context['task_instance']
print(f"Processing sensor data for {execution_date}")
print(f"Found {len(files_found)} files: {files_found}")
# Simulate processing logic
processing_results = []
for file_path in files_found:
# Extract timestamp from filename
filename = file_path.split('/')[-1]
if 'sensor_data_' in filename:
timestamp = filename.replace('sensor_data_', '').replace('.parquet', '')
processing_results.append({
"file": file_path,
"timestamp": timestamp,
"status": "processed"
})
# Push results to XCom for downstream tasks
task_instance.xcom_push(key='processing_results', value=processing_results)
return {
"execution_date": execution_date,
"files_processed": len(processing_results),
"processing_results": processing_results
}from airflow import DAG
from airflow.providers.sftp.decorators.sensors.sftp import sftp_sensor_task
from datetime import datetime, timedelta
dag = DAG(
'sftp_decorator_patterns',
start_date=datetime(2023, 1, 1),
schedule_interval=timedelta(hours=4)
)
@sftp_sensor_task(
path='/remote/mixed_data',
file_pattern='*', # Monitor all files
sftp_conn_id='sftp_default',
dag=dag
)
def categorize_files(files_found, **context):
"""Categorize found files by type and process accordingly."""
categorized = {
'csv_files': [],
'json_files': [],
'xml_files': [],
'other_files': []
}
for file_path in files_found:
filename = file_path.lower()
if filename.endswith('.csv'):
categorized['csv_files'].append(file_path)
elif filename.endswith('.json'):
categorized['json_files'].append(file_path)
elif filename.endswith('.xml'):
categorized['xml_files'].append(file_path)
else:
categorized['other_files'].append(file_path)
# Log categorization results
for category, files in categorized.items():
if files:
print(f"{category}: {len(files)} files")
for file in files:
print(f" - {file}")
return categorizedfrom airflow import DAG
from airflow.providers.sftp.decorators.sensors.sftp import sftp_sensor_task
from airflow.exceptions import AirflowException
from datetime import datetime, timedelta
dag = DAG(
'sftp_decorator_error_handling',
start_date=datetime(2023, 1, 1),
schedule_interval=timedelta(hours=6)
)
@sftp_sensor_task(
path='/remote/critical_files',
file_pattern='critical_*.txt',
newer_than='{{ ds }}T06:00:00',
sftp_conn_id='sftp_critical',
timeout=7200,
retries=2,
retry_delay=timedelta(minutes=15),
dag=dag
)
def process_critical_files(files_found, **context):
"""Process critical files with comprehensive error handling."""
if not files_found:
raise AirflowException("No critical files found - this should not happen")
try:
processed_files = []
failed_files = []
for file_path in files_found:
try:
# Simulate file processing
print(f"Processing critical file: {file_path}")
# Add your critical file processing logic here
# For example: data validation, format checking, etc.
# Simulate processing success/failure
if "invalid" not in file_path.lower():
processed_files.append(file_path)
print(f"Successfully processed: {file_path}")
else:
failed_files.append(file_path)
print(f"Processing failed: {file_path}")
except Exception as e:
failed_files.append(file_path)
print(f"Error processing {file_path}: {e}")
# Check if any critical files failed
if failed_files:
error_msg = f"Failed to process {len(failed_files)} critical files: {failed_files}"
print(error_msg)
# Decide whether to fail the task or just warn
if len(failed_files) > len(processed_files):
raise AirflowException(error_msg)
return {
"total_files": len(files_found),
"processed_files": processed_files,
"failed_files": failed_files,
"success_rate": len(processed_files) / len(files_found) * 100
}
except Exception as e:
print(f"Critical error in file processing: {e}")
raise AirflowException(f"Critical file processing failed: {e}")from airflow import DAG
from airflow.providers.sftp.decorators.sensors.sftp import sftp_sensor_task
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
def process_downstream(**context):
"""Process results from decorated SFTP sensor."""
# Pull results from the decorated sensor
sensor_results = context['task_instance'].xcom_pull(task_ids='monitor_data_files')
print(f"Received sensor results: {sensor_results}")
if sensor_results and 'files_found' in sensor_results:
files = sensor_results['files_found']
print(f"Processing {len(files)} files downstream")
# Add downstream processing logic
for file_path in files:
print(f"Downstream processing: {file_path}")
return "Downstream processing complete"
dag = DAG(
'sftp_decorator_integration',
start_date=datetime(2023, 1, 1),
schedule_interval=timedelta(hours=3)
)
# Decorated sensor task
@sftp_sensor_task(
task_id='monitor_data_files',
path='/remote/data_pipeline',
file_pattern='pipeline_*.csv',
sftp_conn_id='sftp_default',
dag=dag
)
def monitor_and_validate(files_found, **context):
"""Monitor files and perform initial validation."""
validated_files = []
for file_path in files_found:
# Perform validation logic
if file_path.endswith('.csv'):
validated_files.append(file_path)
print(f"Validated: {file_path}")
return {
"files_found": files_found,
"validated_files": validated_files,
"validation_count": len(validated_files)
}
# Downstream processing task
downstream_task = PythonOperator(
task_id='downstream_processing',
python_callable=process_downstream,
dag=dag
)
# Set up task dependencies
monitor_and_validate >> downstream_taskfrom airflow import DAG
from airflow.providers.sftp.decorators.sensors.sftp import sftp_sensor_task
from datetime import datetime, timedelta
dag = DAG(
'sftp_decorator_templating',
start_date=datetime(2023, 1, 1),
schedule_interval=timedelta(days=1)
)
@sftp_sensor_task(
path='/remote/daily/{{ ds }}', # Templated path
file_pattern='data_{{ ds_nodash }}_*.json', # Templated pattern
newer_than='{{ ds }}T05:00:00', # Templated time
sftp_conn_id='sftp_default',
timeout=14400,
dag=dag
)
def process_daily_data(files_found, **context):
"""Process daily data files using Airflow templating."""
execution_date = context['ds']
formatted_date = context['ds_nodash']
print(f"Processing daily data for {execution_date}")
print(f"Looking for pattern: data_{formatted_date}_*.json")
print(f"Found {len(files_found)} files")
daily_summary = {
"execution_date": execution_date,
"formatted_date": formatted_date,
"files_found": files_found,
"file_count": len(files_found)
}
# Process each file
for file_path in files_found:
print(f"Processing daily file: {file_path}")
# Add daily file processing logic
return daily_summaryfiles_found parameter injectionInstall with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-sftp