Provider package for Apache Airflow that enables FTP file transfer protocol operations including hooks, operators, and sensors for workflow integration.
84
Low-level hook classes for establishing and managing FTP connections with comprehensive file operations, authentication support, and SSL/TLS encryption capabilities.
Primary hook class for standard FTP connections with full file system operations and connection management.
class FTPHook(BaseHook):
"""
Interact with FTP servers.
Parameters:
- ftp_conn_id (str): The FTP connection ID reference (default: "ftp_default")
"""
conn_name_attr = "ftp_conn_id"
default_conn_name = "ftp_default"
conn_type = "ftp"
hook_name = "FTP"
def __init__(self, ftp_conn_id: str = "ftp_default") -> None: ...
def __enter__(self) -> FTPHook: ...
def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: ...
def get_conn(self) -> ftplib.FTP:
"""
Return an FTP connection object.
Returns:
ftplib.FTP: Active FTP connection
"""
def close_conn(self) -> None:
"""
Close the FTP connection.
Raises:
Exception: If connection was never opened
"""
def test_connection(self) -> tuple[bool, str]:
"""
Test FTP connection by calling path with directory.
Returns:
tuple[bool, str]: (success_status, message)
"""Methods for managing directories on the FTP server.
def describe_directory(self, path: str) -> dict:
"""
Return directory listing with file attributes using MLSD.
Parameters:
- path (str): Full path to remote directory
Returns:
dict: Dictionary of {filename: {attributes}}
"""
def list_directory(self, path: str) -> list[str]:
"""
Return list of files in remote directory using NLST.
Parameters:
- path (str): Full path to remote directory
Returns:
list[str]: List of filenames
"""
def create_directory(self, path: str) -> None:
"""
Create directory on remote system.
Parameters:
- path (str): Full path to remote directory to create
"""
def delete_directory(self, path: str) -> None:
"""
Delete directory on remote system.
Parameters:
- path (str): Full path to remote directory to delete
"""Methods for transferring and managing files on the FTP server.
def retrieve_file(
self,
remote_full_path: str,
local_full_path_or_buffer: Any,
callback: Callable | None = None,
block_size: int = 8192
) -> None:
"""
Transfer remote file to local location.
Parameters:
- remote_full_path (str): Full path to remote file
- local_full_path_or_buffer (str | file-like): Local path or buffer
- callback (Callable, optional): Called for each data block
- block_size (int): Transfer chunk size (default: 8192)
"""
def store_file(
self,
remote_full_path: str,
local_full_path_or_buffer: Any,
block_size: int = 8192
) -> None:
"""
Transfer local file to remote location.
Parameters:
- remote_full_path (str): Full path to remote file
- local_full_path_or_buffer (str | file-like): Local path or buffer
- block_size (int): Transfer chunk size (default: 8192)
"""
def delete_file(self, path: str) -> None:
"""
Remove file on FTP server.
Parameters:
- path (str): Full path to remote file
"""
def rename(self, from_name: str, to_name: str) -> str:
"""
Rename file on FTP server.
Parameters:
- from_name (str): Current filename
- to_name (str): New filename
Returns:
str: Server response
"""Methods for retrieving file metadata and properties.
def get_mod_time(self, path: str) -> datetime.datetime:
"""
Return file last modification time.
Parameters:
- path (str): Remote file path
Returns:
datetime.datetime: Last modification timestamp
"""
def get_size(self, path: str) -> int | None:
"""
Return file size in bytes.
Parameters:
- path (str): Remote file path
Returns:
int | None: File size or None if unavailable
"""Secure FTP hook supporting SSL/TLS encryption for encrypted file transfers.
class FTPSHook(FTPHook):
"""
Interact with FTPS (FTP over SSL/TLS) servers.
Inherits all FTPHook methods with SSL/TLS encryption support.
"""
def get_conn(self) -> ftplib.FTP:
"""
Return FTPS connection object with SSL context.
Returns:
ftplib.FTP: Active FTPS connection with SSL/TLS encryption
"""from airflow.providers.ftp.hooks.ftp import FTPHook
# Using context manager for automatic connection cleanup
with FTPHook(ftp_conn_id='my_ftp') as hook:
# List directory contents
files = hook.list_directory('/remote/data')
# Download file
hook.retrieve_file('/remote/data/input.csv', '/local/data/input.csv')
# Upload file
hook.store_file('/remote/data/output.csv', '/local/data/output.csv')
# Get file information
size = hook.get_size('/remote/data/input.csv')
mod_time = hook.get_mod_time('/remote/data/input.csv')from airflow.providers.ftp.hooks.ftp import FTPHook
def progress_callback(data):
print(f"Transferred {len(data)} bytes")
hook = FTPHook(ftp_conn_id='my_ftp')
# Download with progress tracking
hook.retrieve_file(
remote_full_path='/remote/large_file.zip',
local_full_path_or_buffer='/local/large_file.zip',
callback=progress_callback,
block_size=16384
)
hook.close_conn()from airflow.providers.ftp.hooks.ftp import FTPSHook
# Using FTPS for encrypted transfers
with FTPSHook(ftp_conn_id='my_secure_ftp') as hook:
# All operations are encrypted via SSL/TLS
hook.store_file('/secure/path/confidential.txt', '/local/confidential.txt')
files = hook.list_directory('/secure/path')FTP operations may raise various exceptions from the underlying ftplib:
Always handle exceptions appropriately and use the test_connection() method to verify connectivity before performing operations.
Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-ftpevals
scenario-1
scenario-2
scenario-3
scenario-4
scenario-5
scenario-6
scenario-7
scenario-8
scenario-9
scenario-10