SSH File Transfer Protocol (SFTP) provider for Apache Airflow with hooks, operators, sensors, triggers, and decorators for secure file operations
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Task execution components for uploading and downloading files between local and remote SFTP locations. The SFTP operator provides robust file transfer capabilities with support for batch operations, intermediate directory creation, and comprehensive error handling.
Operation type constants for specifying the direction of file transfers.
class SFTPOperation:
"""Operation that can be used with SFTP."""
PUT = "put" # Upload operation from local to remote
GET = "get" # Download operation from remote to localMain operator for transferring files between local and remote SFTP locations.
class SFTPOperator(BaseOperator):
"""
SFTPOperator for transferring files from remote host to local or vice versa.
This operator uses SFTP hook to open SFTP transport channel that serves as
basis for file transfer operations. Supports both single file and batch
file transfers with comprehensive configuration options.
"""
template_fields: Sequence[str] = ("local_filepath", "remote_filepath", "remote_host")
def __init__(
self,
*,
ssh_hook: SSHHook | None = None,
sftp_hook: SFTPHook | None = None,
ssh_conn_id: str | None = None,
remote_host: str | None = None,
local_filepath: str | list[str],
remote_filepath: str | list[str],
operation: str = SFTPOperation.PUT,
confirm: bool = True,
create_intermediate_dirs: bool = False,
**kwargs,
) -> None:
"""
Initialize SFTP operator.
Parameters:
- ssh_conn_id: SSH connection ID from Airflow connections
- sftp_hook: Predefined SFTPHook to use (preferred over ssh_conn_id)
- ssh_hook: Deprecated - predefined SSHHook to use (use sftp_hook instead)
- remote_host: Remote host to connect (templated)
- local_filepath: Local file path or list of paths to get or put (templated)
- remote_filepath: Remote file path or list of paths to get or put (templated)
- operation: Specify operation 'get' or 'put' (default: put)
- confirm: Specify if SFTP operation should be confirmed (default: True)
- create_intermediate_dirs: Create missing intermediate directories (default: False)
"""
def execute(self, context: Any) -> str | list[str] | None:
"""Execute the file transfer operation."""
def get_openlineage_facets_on_start(self):
"""Return OpenLineage datasets for lineage tracking."""def execute(self, context: Any) -> str | list[str] | None:
"""
Execute the file transfer operation (PUT or GET).
Validates file path arrays, establishes SFTP connection, and performs
the specified transfer operation with optional intermediate directory creation.
Parameters:
- context: Airflow task execution context
Returns:
Filepath or list of filepaths that were transferred, or None
Raises:
ValueError: If local_filepath and remote_filepath arrays have different lengths
TypeError: If operation is not 'get' or 'put'
AirflowException: If both ssh_hook and sftp_hook are defined, or transfer fails
"""def get_openlineage_facets_on_start(self):
"""
Return OpenLineage datasets for data lineage tracking.
Creates dataset facets for both local and remote file locations
to enable lineage tracking in OpenLineage-compatible systems.
Returns:
OpenLineage facets containing input and output datasets
"""from airflow import DAG
from airflow.providers.sftp.operators.sftp import SFTPOperator, SFTPOperation
from datetime import datetime
dag = DAG(
'sftp_upload_example',
start_date=datetime(2023, 1, 1),
schedule_interval=None
)
upload_task = SFTPOperator(
task_id='upload_file',
ssh_conn_id='sftp_default',
local_filepath='/local/data/report.csv',
remote_filepath='/remote/uploads/report.csv',
operation=SFTPOperation.PUT,
dag=dag
)from airflow import DAG
from airflow.providers.sftp.operators.sftp import SFTPOperator, SFTPOperation
from datetime import datetime
dag = DAG(
'sftp_download_example',
start_date=datetime(2023, 1, 1),
schedule_interval=None
)
download_task = SFTPOperator(
task_id='download_file',
ssh_conn_id='sftp_default',
local_filepath='/local/downloads/data.csv',
remote_filepath='/remote/exports/data.csv',
operation=SFTPOperation.GET,
dag=dag
)from airflow import DAG
from airflow.providers.sftp.operators.sftp import SFTPOperator, SFTPOperation
from datetime import datetime, timedelta
dag = DAG(
'sftp_batch_example',
start_date=datetime(2023, 1, 1),
schedule_interval=timedelta(days=1)
)
# Upload multiple files
batch_upload = SFTPOperator(
task_id='batch_upload',
ssh_conn_id='sftp_default',
local_filepath=[
'/local/data/file1.csv',
'/local/data/file2.csv',
'/local/data/file3.csv'
],
remote_filepath=[
'/remote/uploads/file1.csv',
'/remote/uploads/file2.csv',
'/remote/uploads/file3.csv'
],
operation=SFTPOperation.PUT,
create_intermediate_dirs=True, # Create /remote/uploads/ if it doesn't exist
dag=dag
)
# Download multiple files
batch_download = SFTPOperator(
task_id='batch_download',
ssh_conn_id='sftp_default',
local_filepath=[
'/local/downloads/result1.json',
'/local/downloads/result2.json'
],
remote_filepath=[
'/remote/results/result1.json',
'/remote/results/result2.json'
],
operation=SFTPOperation.GET,
dag=dag
)from airflow import DAG
from airflow.providers.sftp.operators.sftp import SFTPOperator, SFTPOperation
from airflow.providers.sftp.hooks.sftp import SFTPHook
from datetime import datetime
dag = DAG(
'sftp_advanced_example',
start_date=datetime(2023, 1, 1),
schedule_interval=None
)
# Using predefined hook for custom configuration
custom_hook = SFTPHook(ssh_conn_id='sftp_custom')
advanced_transfer = SFTPOperator(
task_id='advanced_transfer',
sftp_hook=custom_hook, # Use predefined hook
remote_host='custom.sftp.server.com', # Override connection host
local_filepath='/local/data/{{ ds }}/report.csv', # Templated path
remote_filepath='/remote/daily/{{ ds }}/report.csv', # Templated path
operation=SFTPOperation.PUT,
confirm=True, # Confirm successful transfer
create_intermediate_dirs=True, # Create date-based directories
dag=dag
)from airflow import DAG
from airflow.providers.sftp.operators.sftp import SFTPOperator, SFTPOperation
from datetime import datetime, timedelta
default_args = {
'retries': 3,
'retry_delay': timedelta(minutes=5),
'retry_exponential_backoff': True,
'max_retry_delay': timedelta(minutes=30)
}
dag = DAG(
'sftp_resilient_example',
default_args=default_args,
start_date=datetime(2023, 1, 1),
schedule_interval=timedelta(hours=6)
)
resilient_transfer = SFTPOperator(
task_id='resilient_transfer',
ssh_conn_id='sftp_prod',
local_filepath='/local/critical/data.parquet',
remote_filepath='/remote/warehouse/data.parquet',
operation=SFTPOperation.PUT,
create_intermediate_dirs=True,
# Operator will automatically retry on connection or transfer failures
dag=dag
)from airflow import DAG
from airflow.providers.sftp.operators.sftp import SFTPOperator, SFTPOperation
from airflow.providers.sftp.sensors.sftp import SFTPSensor
from datetime import datetime, timedelta
dag = DAG(
'sftp_sensor_integration',
start_date=datetime(2023, 1, 1),
schedule_interval=timedelta(hours=1)
)
# Wait for source file to appear
wait_for_file = SFTPSensor(
task_id='wait_for_source',
path='/remote/incoming/data.csv',
sftp_conn_id='sftp_source',
timeout=3600, # Wait up to 1 hour
poke_interval=300, # Check every 5 minutes
dag=dag
)
# Download the file once it's available
download_file = SFTPOperator(
task_id='download_data',
ssh_conn_id='sftp_source',
local_filepath='/local/staging/data.csv',
remote_filepath='/remote/incoming/data.csv',
operation=SFTPOperation.GET,
dag=dag
)
# Upload processed file to different server
upload_processed = SFTPOperator(
task_id='upload_processed',
ssh_conn_id='sftp_destination',
local_filepath='/local/processed/data.csv',
remote_filepath='/remote/processed/data.csv',
operation=SFTPOperation.PUT,
create_intermediate_dirs=True,
dag=dag
)
wait_for_file >> download_file >> upload_processedfrom airflow import DAG
from airflow.providers.sftp.operators.sftp import SFTPOperator, SFTPOperation
from datetime import datetime, timedelta
dag = DAG(
'sftp_templating_example',
start_date=datetime(2023, 1, 1),
schedule_interval=timedelta(days=1)
)
templated_transfer = SFTPOperator(
task_id='templated_transfer',
ssh_conn_id='sftp_default',
# Use Airflow templating for dynamic paths
local_filepath='/local/data/{{ ds }}/export_{{ ts_nodash }}.csv',
remote_filepath='/remote/daily/{{ ds }}/export_{{ ts_nodash }}.csv',
operation=SFTPOperation.PUT,
create_intermediate_dirs=True,
dag=dag
)sftp_hook parameter over deprecated ssh_hookcreate_intermediate_dirs=True for dynamic directory structuresconfirm=True to verify successful transfersInstall with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-sftp