Provider package for Apache Airflow that enables FTP file transfer protocol operations including hooks, operators, and sensors for workflow integration.
84
Operator classes for performing file uploads, downloads, and transfers between local and remote FTP servers with Airflow integration, templating support, and OpenLineage compatibility for data lineage tracking.
Constants defining supported file transfer operations.
class FTPOperation:
"""Operation types for FTP file transfers."""
PUT = "put" # Upload files to remote server
GET = "get" # Download files from remote serverPrimary operator for transferring files between local filesystem and FTP servers with support for single files or batch operations.
class FTPFileTransmitOperator(BaseOperator):
"""
Transfer files between local and remote FTP locations.
Template Fields: ("local_filepath", "remote_filepath")
Parameters:
- ftp_conn_id (str): FTP connection ID (default: "ftp_default")
- local_filepath (str | list[str]): Local file path(s)
- remote_filepath (str | list[str]): Remote file path(s)
- operation (str): Transfer direction - FTPOperation.PUT or FTPOperation.GET
- create_intermediate_dirs (bool): Create missing directories (default: False)
"""
template_fields: Sequence[str] = ("local_filepath", "remote_filepath")
def __init__(
self,
*,
ftp_conn_id: str = "ftp_default",
local_filepath: str | list[str],
remote_filepath: str | list[str],
operation: str = FTPOperation.PUT,
create_intermediate_dirs: bool = False,
**kwargs
) -> None: ...
def execute(self, context: Any) -> str | list[str] | None:
"""
Execute file transfer operation.
Parameters:
- context (Any): Airflow task context
Returns:
str | list[str] | None: Local filepath(s) after operation
"""
def get_openlineage_facets_on_start(self):
"""
Return OpenLineage datasets for data lineage tracking.
Returns:
OperatorLineage: Input and output datasets for lineage
"""
@cached_property
def hook(self) -> FTPHook:
"""
Create and return FTPHook instance.
Returns:
FTPHook: Configured FTP hook
"""Secure file transfer operator using FTPS (FTP over SSL/TLS) for encrypted file transfers.
class FTPSFileTransmitOperator(FTPFileTransmitOperator):
"""
Transfer files using FTPS (FTP over SSL/TLS) for encrypted transfers.
Inherits all FTPFileTransmitOperator functionality with SSL/TLS encryption.
"""
@cached_property
def hook(self) -> FTPSHook:
"""
Create and return FTPSHook instance.
Returns:
FTPSHook: Configured FTPS hook with SSL/TLS support
"""from airflow import DAG
from airflow.providers.ftp.operators.ftp import FTPFileTransmitOperator, FTPOperation
from datetime import datetime
dag = DAG('ftp_upload_example', start_date=datetime(2023, 1, 1))
upload_task = FTPFileTransmitOperator(
task_id='upload_data_file',
ftp_conn_id='my_ftp_connection',
local_filepath='/local/data/output.csv',
remote_filepath='/remote/uploads/output.csv',
operation=FTPOperation.PUT,
create_intermediate_dirs=True, # Create /remote/uploads/ if it doesn't exist
dag=dag
)from airflow.providers.ftp.operators.ftp import FTPFileTransmitOperator, FTPOperation
download_task = FTPFileTransmitOperator(
task_id='download_data_file',
ftp_conn_id='my_ftp_connection',
local_filepath='/local/data/input.csv',
remote_filepath='/remote/data/input.csv',
operation=FTPOperation.GET,
create_intermediate_dirs=True, # Create /local/data/ if it doesn't exist
dag=dag
)from airflow.providers.ftp.operators.ftp import FTPFileTransmitOperator, FTPOperation
# Upload multiple files in one operation
batch_upload = FTPFileTransmitOperator(
task_id='batch_upload_files',
ftp_conn_id='my_ftp_connection',
local_filepath=[
'/local/data/file1.csv',
'/local/data/file2.csv',
'/local/data/file3.csv'
],
remote_filepath=[
'/remote/uploads/file1.csv',
'/remote/uploads/file2.csv',
'/remote/uploads/file3.csv'
],
operation=FTPOperation.PUT,
create_intermediate_dirs=True,
dag=dag
)
# Download multiple files in one operation
batch_download = FTPFileTransmitOperator(
task_id='batch_download_files',
ftp_conn_id='my_ftp_connection',
local_filepath=[
'/local/downloads/report1.pdf',
'/local/downloads/report2.pdf'
],
remote_filepath=[
'/remote/reports/report1.pdf',
'/remote/reports/report2.pdf'
],
operation=FTPOperation.GET,
create_intermediate_dirs=True,
dag=dag
)from airflow.providers.ftp.operators.ftp import FTPSFileTransmitOperator, FTPOperation
secure_upload = FTPSFileTransmitOperator(
task_id='secure_upload',
ftp_conn_id='my_secure_ftp_connection', # Connection configured for FTPS
local_filepath='/local/sensitive/data.xml',
remote_filepath='/remote/secure/data.xml',
operation=FTPOperation.PUT,
create_intermediate_dirs=True,
dag=dag
)from airflow.providers.ftp.operators.ftp import FTPFileTransmitOperator, FTPOperation
# Using templated file paths with Airflow variables and macros
templated_transfer = FTPFileTransmitOperator(
task_id='templated_transfer',
ftp_conn_id='my_ftp_connection',
local_filepath='/local/data/{{ ds }}/report.csv', # Uses execution date
remote_filepath='/remote/reports/{{ ds }}/report.csv',
operation=FTPOperation.PUT,
create_intermediate_dirs=True,
dag=dag
)from airflow import DAG
from airflow.providers.ftp.operators.ftp import FTPFileTransmitOperator, FTPOperation
from airflow.providers.ftp.sensors.ftp import FTPSensor
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
def process_data():
# Data processing logic here
print("Processing downloaded data...")
return "Data processed successfully"
dag = DAG(
'ftp_etl_pipeline',
start_date=datetime(2023, 1, 1),
schedule_interval=timedelta(days=1),
catchup=False
)
# Wait for input file to arrive
wait_for_input = FTPSensor(
task_id='wait_for_input_file',
path='/remote/input/{{ ds }}/data.csv',
ftp_conn_id='source_ftp',
poke_interval=300, # Check every 5 minutes
timeout=3600, # Timeout after 1 hour
dag=dag
)
# Download input file
download_input = FTPFileTransmitOperator(
task_id='download_input_file',
ftp_conn_id='source_ftp',
local_filepath='/local/staging/{{ ds }}/input.csv',
remote_filepath='/remote/input/{{ ds }}/data.csv',
operation=FTPOperation.GET,
create_intermediate_dirs=True,
dag=dag
)
# Process the data
process_task = PythonOperator(
task_id='process_data',
python_callable=process_data,
dag=dag
)
# Upload processed results
upload_results = FTPFileTransmitOperator(
task_id='upload_results',
ftp_conn_id='destination_ftp',
local_filepath='/local/output/{{ ds }}/processed_data.csv',
remote_filepath='/remote/output/{{ ds }}/processed_data.csv',
operation=FTPOperation.PUT,
create_intermediate_dirs=True,
dag=dag
)
# Define task dependencies
wait_for_input >> download_input >> process_task >> upload_resultsThe operators handle various error conditions:
When OpenLineage providers are available, the operators automatically generate data lineage information:
file://hostname:port for proper dataset identificationThis enables comprehensive data lineage tracking across FTP file transfer operations within your data pipelines.
Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-ftpevals
scenario-1
scenario-2
scenario-3
scenario-4
scenario-5
scenario-6
scenario-7
scenario-8
scenario-9
scenario-10