or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

decorators.mdhooks.mdindex.mdoperators.mdsensors.mdtriggers.md
tile.json

tessl/pypi-apache-airflow-providers-sftp

SSH File Transfer Protocol (SFTP) provider for Apache Airflow with hooks, operators, sensors, triggers, and decorators for secure file operations

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/apache-airflow-providers-sftp@4.11.x

To install, run

npx @tessl/cli install tessl/pypi-apache-airflow-providers-sftp@4.11.0

index.mddocs/

Apache Airflow SFTP Provider

A comprehensive Apache Airflow provider package for SSH File Transfer Protocol (SFTP) operations. This provider enables secure file transfers and remote file system operations within Airflow workflows, offering hooks for connectivity, operators for file transfer tasks, sensors for monitoring file presence, triggers for asynchronous operations, and decorators for simplified task creation.

Package Information

  • Package Name: apache-airflow-providers-sftp
  • Language: Python
  • Installation: pip install apache-airflow-providers-sftp
  • Dependencies: apache-airflow>=2.8.0, apache-airflow-providers-ssh>=2.1.0, paramiko>=2.9.0, asyncssh>=2.12.0

Core Imports

from airflow.providers.sftp.hooks.sftp import SFTPHook, SFTPHookAsync
from airflow.providers.sftp.operators.sftp import SFTPOperator, SFTPOperation
from airflow.providers.sftp.sensors.sftp import SFTPSensor
from airflow.providers.sftp.triggers.sftp import SFTPTrigger
from airflow.providers.sftp.decorators.sensors.sftp import sftp_sensor_task

Basic Usage

from airflow import DAG
from airflow.providers.sftp.operators.sftp import SFTPOperator, SFTPOperation
from airflow.providers.sftp.sensors.sftp import SFTPSensor
from datetime import datetime, timedelta

# Create DAG instance
dag = DAG(
    'sftp_example',
    default_args={'retries': 1},
    schedule_interval=timedelta(hours=1),
    start_date=datetime(2023, 1, 1)
)

# Monitor for file existence
sensor = SFTPSensor(
    task_id='check_file_exists',
    path='/remote/path/data.csv',
    sftp_conn_id='sftp_default',
    dag=dag
)

# Download file from SFTP server
download = SFTPOperator(
    task_id='download_file',
    ssh_conn_id='sftp_default',
    local_filepath='/local/path/data.csv',
    remote_filepath='/remote/path/data.csv',
    operation=SFTPOperation.GET,
    dag=dag
)

# Upload file to SFTP server
upload = SFTPOperator(
    task_id='upload_file',
    ssh_conn_id='sftp_default',
    local_filepath='/local/path/processed_data.csv',
    remote_filepath='/remote/path/processed_data.csv',
    operation=SFTPOperation.PUT,
    create_intermediate_dirs=True,
    dag=dag
)

sensor >> download >> upload

Architecture

The SFTP provider follows Airflow's standard provider architecture with specialized components:

  • Hooks: Low-level interfaces for SFTP connections (synchronous and asynchronous)
  • Operators: Task execution components for file transfer operations
  • Sensors: Monitoring components for file presence and condition checking
  • Triggers: Asynchronous components for deferrable operations
  • Decorators: Simplified task creation interfaces for common patterns

Connection management is handled through Airflow's connection system with the sftp connection type, supporting authentication via SSH keys, passwords, and various security configurations.

Capabilities

SFTP Hooks

Core connectivity and file system operations including connection management, directory operations, file transfers, and path utilities. Both synchronous and asynchronous hooks are available for different operational patterns.

class SFTPHook(SSHHook):
    def get_conn(self) -> paramiko.SFTPClient: ...
    def close_conn(self) -> None: ...
    def list_directory(self, path: str) -> list[str]: ...
    def retrieve_file(self, remote_full_path: str, local_full_path: str, prefetch: bool = True) -> None: ...
    def store_file(self, remote_full_path: str, local_full_path: str, confirm: bool = True) -> None: ...

class SFTPHookAsync(BaseHook):
    async def list_directory(self, path: str = "") -> list[str] | None: ...
    async def get_mod_time(self, path: str) -> str: ...

SFTP Hooks

File Transfer Operations

Task execution components for uploading and downloading files between local and remote SFTP locations, with support for batch operations, intermediate directory creation, and connection pooling.

class SFTPOperator(BaseOperator):
    def __init__(
        self,
        *,
        local_filepath: str | list[str],
        remote_filepath: str | list[str],
        operation: str = SFTPOperation.PUT,
        ssh_conn_id: str | None = None,
        create_intermediate_dirs: bool = False,
        **kwargs,
    ) -> None: ...

class SFTPOperation:
    PUT = "put"
    GET = "get"

File Transfer Operations

File Monitoring

Sensor components for monitoring file and directory presence, modification times, and pattern matching on SFTP servers, supporting both blocking and deferrable execution modes.

class SFTPSensor(BaseSensorOperator):
    def __init__(
        self,
        *,
        path: str,
        file_pattern: str = "",
        newer_than: datetime | str | None = None,
        sftp_conn_id: str = "sftp_default",
        deferrable: bool = False,
        **kwargs,
    ) -> None: ...

File Monitoring

Asynchronous Triggers

Trigger components for deferrable SFTP operations that enable efficient resource utilization by yielding control during long-running file monitoring operations.

class SFTPTrigger(BaseTrigger):
    def __init__(
        self,
        path: str,
        file_pattern: str = "",
        sftp_conn_id: str = "sftp_default",
        newer_than: datetime | str | None = None,
        poke_interval: float = 5,
    ) -> None: ...
    
    async def run(self) -> AsyncIterator[TriggerEvent]: ...

Asynchronous Triggers

Task Decorators

Simplified interfaces for creating SFTP-based tasks using Python decorators, enabling more readable and maintainable DAG definitions for common SFTP operations.

def sftp_sensor_task(python_callable: Callable | None = None, **kwargs) -> TaskDecorator: ...

Task Decorators

Connection Configuration

SFTP connections are configured through Airflow's connection management system:

  • Connection Type: sftp
  • Default Connection ID: sftp_default
  • Authentication: SSH keys, passwords, or both
  • Extra Configuration: Host key verification, known hosts, private keys

Types

from typing import Callable, Sequence, Any, AsyncIterator
from datetime import datetime
from paramiko import SFTPClient
from asyncssh.sftp import SFTPName
from airflow.triggers.base import TriggerEvent
from airflow.sensors.base import PokeReturnValue
from airflow.decorators.base import TaskDecorator