CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-ftp

Provider package for Apache Airflow that enables FTP file transfer protocol operations including hooks, operators, and sensors for workflow integration.

84

1.06x
Overview
Eval results
Files

ftp-operators.mddocs/

File Transfer Operations

Operator classes for performing file uploads, downloads, and transfers between local and remote FTP servers with Airflow integration, templating support, and OpenLineage compatibility for data lineage tracking.

Capabilities

FTP Operation Constants

Constants defining supported file transfer operations.

class FTPOperation:
    """Operation types for FTP file transfers."""
    
    PUT = "put"  # Upload files to remote server
    GET = "get"  # Download files from remote server

FTP File Transfer Operator

Primary operator for transferring files between local filesystem and FTP servers with support for single files or batch operations.

class FTPFileTransmitOperator(BaseOperator):
    """
    Transfer files between local and remote FTP locations.
    
    Template Fields: ("local_filepath", "remote_filepath")
    
    Parameters:
    - ftp_conn_id (str): FTP connection ID (default: "ftp_default")
    - local_filepath (str | list[str]): Local file path(s)
    - remote_filepath (str | list[str]): Remote file path(s)
    - operation (str): Transfer direction - FTPOperation.PUT or FTPOperation.GET
    - create_intermediate_dirs (bool): Create missing directories (default: False)
    """
    
    template_fields: Sequence[str] = ("local_filepath", "remote_filepath")
    
    def __init__(
        self,
        *,
        ftp_conn_id: str = "ftp_default",
        local_filepath: str | list[str],
        remote_filepath: str | list[str],
        operation: str = FTPOperation.PUT,
        create_intermediate_dirs: bool = False,
        **kwargs
    ) -> None: ...
    
    def execute(self, context: Any) -> str | list[str] | None:
        """
        Execute file transfer operation.
        
        Parameters:
        - context (Any): Airflow task context
        
        Returns:
        str | list[str] | None: Local filepath(s) after operation
        """
    
    def get_openlineage_facets_on_start(self):
        """
        Return OpenLineage datasets for data lineage tracking.
        
        Returns:
        OperatorLineage: Input and output datasets for lineage
        """
    
    @cached_property
    def hook(self) -> FTPHook:
        """
        Create and return FTPHook instance.
        
        Returns:
        FTPHook: Configured FTP hook
        """

FTPS File Transfer Operator

Secure file transfer operator using FTPS (FTP over SSL/TLS) for encrypted file transfers.

class FTPSFileTransmitOperator(FTPFileTransmitOperator):
    """
    Transfer files using FTPS (FTP over SSL/TLS) for encrypted transfers.
    
    Inherits all FTPFileTransmitOperator functionality with SSL/TLS encryption.
    """
    
    @cached_property
    def hook(self) -> FTPSHook:
        """
        Create and return FTPSHook instance.
        
        Returns:
        FTPSHook: Configured FTPS hook with SSL/TLS support
        """

Usage Examples

Single File Upload

from airflow import DAG
from airflow.providers.ftp.operators.ftp import FTPFileTransmitOperator, FTPOperation
from datetime import datetime

dag = DAG('ftp_upload_example', start_date=datetime(2023, 1, 1))

upload_task = FTPFileTransmitOperator(
    task_id='upload_data_file',
    ftp_conn_id='my_ftp_connection',
    local_filepath='/local/data/output.csv',
    remote_filepath='/remote/uploads/output.csv',
    operation=FTPOperation.PUT,
    create_intermediate_dirs=True,  # Create /remote/uploads/ if it doesn't exist
    dag=dag
)

Single File Download

from airflow.providers.ftp.operators.ftp import FTPFileTransmitOperator, FTPOperation

download_task = FTPFileTransmitOperator(
    task_id='download_data_file',
    ftp_conn_id='my_ftp_connection',
    local_filepath='/local/data/input.csv',
    remote_filepath='/remote/data/input.csv',
    operation=FTPOperation.GET,
    create_intermediate_dirs=True,  # Create /local/data/ if it doesn't exist
    dag=dag
)

Batch File Operations

from airflow.providers.ftp.operators.ftp import FTPFileTransmitOperator, FTPOperation

# Upload multiple files in one operation
batch_upload = FTPFileTransmitOperator(
    task_id='batch_upload_files',
    ftp_conn_id='my_ftp_connection',
    local_filepath=[
        '/local/data/file1.csv',
        '/local/data/file2.csv',
        '/local/data/file3.csv'
    ],
    remote_filepath=[
        '/remote/uploads/file1.csv',
        '/remote/uploads/file2.csv',
        '/remote/uploads/file3.csv'
    ],
    operation=FTPOperation.PUT,
    create_intermediate_dirs=True,
    dag=dag
)

# Download multiple files in one operation
batch_download = FTPFileTransmitOperator(
    task_id='batch_download_files',
    ftp_conn_id='my_ftp_connection',
    local_filepath=[
        '/local/downloads/report1.pdf',
        '/local/downloads/report2.pdf'
    ],
    remote_filepath=[
        '/remote/reports/report1.pdf',
        '/remote/reports/report2.pdf'
    ],
    operation=FTPOperation.GET,
    create_intermediate_dirs=True,
    dag=dag
)

Secure File Transfer with FTPS

from airflow.providers.ftp.operators.ftp import FTPSFileTransmitOperator, FTPOperation

secure_upload = FTPSFileTransmitOperator(
    task_id='secure_upload',
    ftp_conn_id='my_secure_ftp_connection',  # Connection configured for FTPS
    local_filepath='/local/sensitive/data.xml',
    remote_filepath='/remote/secure/data.xml',
    operation=FTPOperation.PUT,
    create_intermediate_dirs=True,
    dag=dag
)

Template Usage with Airflow Variables

from airflow.providers.ftp.operators.ftp import FTPFileTransmitOperator, FTPOperation

# Using templated file paths with Airflow variables and macros
templated_transfer = FTPFileTransmitOperator(
    task_id='templated_transfer',
    ftp_conn_id='my_ftp_connection',
    local_filepath='/local/data/{{ ds }}/report.csv',  # Uses execution date
    remote_filepath='/remote/reports/{{ ds }}/report.csv',
    operation=FTPOperation.PUT,
    create_intermediate_dirs=True,
    dag=dag
)

Complete ETL Pipeline Example

from airflow import DAG
from airflow.providers.ftp.operators.ftp import FTPFileTransmitOperator, FTPOperation
from airflow.providers.ftp.sensors.ftp import FTPSensor
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

def process_data():
    # Data processing logic here
    print("Processing downloaded data...")
    return "Data processed successfully"

dag = DAG(
    'ftp_etl_pipeline', 
    start_date=datetime(2023, 1, 1),
    schedule_interval=timedelta(days=1),
    catchup=False
)

# Wait for input file to arrive
wait_for_input = FTPSensor(
    task_id='wait_for_input_file',
    path='/remote/input/{{ ds }}/data.csv',
    ftp_conn_id='source_ftp',
    poke_interval=300,  # Check every 5 minutes
    timeout=3600,       # Timeout after 1 hour
    dag=dag
)

# Download input file
download_input = FTPFileTransmitOperator(
    task_id='download_input_file',
    ftp_conn_id='source_ftp',
    local_filepath='/local/staging/{{ ds }}/input.csv',
    remote_filepath='/remote/input/{{ ds }}/data.csv',
    operation=FTPOperation.GET,
    create_intermediate_dirs=True,
    dag=dag
)

# Process the data
process_task = PythonOperator(
    task_id='process_data',
    python_callable=process_data,
    dag=dag
)

# Upload processed results
upload_results = FTPFileTransmitOperator(
    task_id='upload_results',
    ftp_conn_id='destination_ftp',
    local_filepath='/local/output/{{ ds }}/processed_data.csv',
    remote_filepath='/remote/output/{{ ds }}/processed_data.csv',
    operation=FTPOperation.PUT,
    create_intermediate_dirs=True,
    dag=dag
)

# Define task dependencies
wait_for_input >> download_input >> process_task >> upload_results

Error Handling

The operators handle various error conditions:

  • ValueError: Raised when local and remote filepath arrays have different lengths
  • TypeError: Raised for unsupported operation values (not GET or PUT)
  • FTP Errors: Propagated from underlying FTPHook operations
  • File System Errors: Raised when local directories cannot be created or accessed

OpenLineage Integration

When OpenLineage providers are available, the operators automatically generate data lineage information:

  • Input Datasets: Source file locations (local for PUT, remote for GET)
  • Output Datasets: Destination file locations (remote for PUT, local for GET)
  • Namespace Format: file://hostname:port for proper dataset identification

This enables comprehensive data lineage tracking across FTP file transfer operations within your data pipelines.

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-ftp

docs

ftp-hooks.md

ftp-operators.md

ftp-sensors.md

index.md

tile.json