SSH File Transfer Protocol (SFTP) provider for Apache Airflow with hooks, operators, sensors, triggers, and decorators for secure file operations
npx @tessl/cli install tessl/pypi-apache-airflow-providers-sftp@4.11.0A comprehensive Apache Airflow provider package for SSH File Transfer Protocol (SFTP) operations. This provider enables secure file transfers and remote file system operations within Airflow workflows, offering hooks for connectivity, operators for file transfer tasks, sensors for monitoring file presence, triggers for asynchronous operations, and decorators for simplified task creation.
pip install apache-airflow-providers-sftpapache-airflow>=2.8.0, apache-airflow-providers-ssh>=2.1.0, paramiko>=2.9.0, asyncssh>=2.12.0from airflow.providers.sftp.hooks.sftp import SFTPHook, SFTPHookAsync
from airflow.providers.sftp.operators.sftp import SFTPOperator, SFTPOperation
from airflow.providers.sftp.sensors.sftp import SFTPSensor
from airflow.providers.sftp.triggers.sftp import SFTPTrigger
from airflow.providers.sftp.decorators.sensors.sftp import sftp_sensor_taskfrom airflow import DAG
from airflow.providers.sftp.operators.sftp import SFTPOperator, SFTPOperation
from airflow.providers.sftp.sensors.sftp import SFTPSensor
from datetime import datetime, timedelta
# Create DAG instance
dag = DAG(
'sftp_example',
default_args={'retries': 1},
schedule_interval=timedelta(hours=1),
start_date=datetime(2023, 1, 1)
)
# Monitor for file existence
sensor = SFTPSensor(
task_id='check_file_exists',
path='/remote/path/data.csv',
sftp_conn_id='sftp_default',
dag=dag
)
# Download file from SFTP server
download = SFTPOperator(
task_id='download_file',
ssh_conn_id='sftp_default',
local_filepath='/local/path/data.csv',
remote_filepath='/remote/path/data.csv',
operation=SFTPOperation.GET,
dag=dag
)
# Upload file to SFTP server
upload = SFTPOperator(
task_id='upload_file',
ssh_conn_id='sftp_default',
local_filepath='/local/path/processed_data.csv',
remote_filepath='/remote/path/processed_data.csv',
operation=SFTPOperation.PUT,
create_intermediate_dirs=True,
dag=dag
)
sensor >> download >> uploadThe SFTP provider follows Airflow's standard provider architecture with specialized components:
Connection management is handled through Airflow's connection system with the sftp connection type, supporting authentication via SSH keys, passwords, and various security configurations.
Core connectivity and file system operations including connection management, directory operations, file transfers, and path utilities. Both synchronous and asynchronous hooks are available for different operational patterns.
class SFTPHook(SSHHook):
def get_conn(self) -> paramiko.SFTPClient: ...
def close_conn(self) -> None: ...
def list_directory(self, path: str) -> list[str]: ...
def retrieve_file(self, remote_full_path: str, local_full_path: str, prefetch: bool = True) -> None: ...
def store_file(self, remote_full_path: str, local_full_path: str, confirm: bool = True) -> None: ...
class SFTPHookAsync(BaseHook):
async def list_directory(self, path: str = "") -> list[str] | None: ...
async def get_mod_time(self, path: str) -> str: ...Task execution components for uploading and downloading files between local and remote SFTP locations, with support for batch operations, intermediate directory creation, and connection pooling.
class SFTPOperator(BaseOperator):
def __init__(
self,
*,
local_filepath: str | list[str],
remote_filepath: str | list[str],
operation: str = SFTPOperation.PUT,
ssh_conn_id: str | None = None,
create_intermediate_dirs: bool = False,
**kwargs,
) -> None: ...
class SFTPOperation:
PUT = "put"
GET = "get"Sensor components for monitoring file and directory presence, modification times, and pattern matching on SFTP servers, supporting both blocking and deferrable execution modes.
class SFTPSensor(BaseSensorOperator):
def __init__(
self,
*,
path: str,
file_pattern: str = "",
newer_than: datetime | str | None = None,
sftp_conn_id: str = "sftp_default",
deferrable: bool = False,
**kwargs,
) -> None: ...Trigger components for deferrable SFTP operations that enable efficient resource utilization by yielding control during long-running file monitoring operations.
class SFTPTrigger(BaseTrigger):
def __init__(
self,
path: str,
file_pattern: str = "",
sftp_conn_id: str = "sftp_default",
newer_than: datetime | str | None = None,
poke_interval: float = 5,
) -> None: ...
async def run(self) -> AsyncIterator[TriggerEvent]: ...Simplified interfaces for creating SFTP-based tasks using Python decorators, enabling more readable and maintainable DAG definitions for common SFTP operations.
def sftp_sensor_task(python_callable: Callable | None = None, **kwargs) -> TaskDecorator: ...SFTP connections are configured through Airflow's connection management system:
sftpsftp_defaultfrom typing import Callable, Sequence, Any, AsyncIterator
from datetime import datetime
from paramiko import SFTPClient
from asyncssh.sftp import SFTPName
from airflow.triggers.base import TriggerEvent
from airflow.sensors.base import PokeReturnValue
from airflow.decorators.base import TaskDecorator