Provider package for Apache Airflow integration with Hadoop Distributed File System (HDFS) and WebHDFS operations
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Core functionality for interacting with HDFS through the WebHDFS REST API. The WebHDFSHook provides a comprehensive interface for file operations, connection management, and authentication with HDFS clusters.
Establishes connections to HDFS namenode clusters with support for high availability configurations, automatic failover, and various authentication methods.
class WebHDFSHook:
"""
Main hook for interacting with HDFS via WebHDFS API.
Attributes:
conn_type (str): Connection type identifier ("webhdfs")
conn_name_attr (str): Connection name attribute ("webhdfs_conn_id")
default_conn_name (str): Default connection name ("webhdfs_default")
hook_name (str): Human readable hook name ("Apache WebHDFS")
"""
def __init__(self, webhdfs_conn_id: str = "webhdfs_default", proxy_user: str | None = None):
"""
Initialize WebHDFS hook.
Parameters:
webhdfs_conn_id: The connection id for the webhdfs client to connect to
proxy_user: The user used to authenticate
"""
def get_conn(self) -> Any:
"""
Establish a connection depending on the security mode set via config or environment variable.
Returns:
Any: A hdfscli client object (InsecureClient or KerberosClient)
Raises:
AirflowWebHDFSHookException: If failed to locate valid server
"""Check for the existence of files and directories in HDFS file system.
def check_for_path(self, hdfs_path: str) -> bool:
"""
Check for the existence of a path in HDFS by querying FileStatus.
Parameters:
hdfs_path: The path to check
Returns:
bool: True if the path exists and False if not
"""Usage example:
from airflow.providers.apache.hdfs.hooks.webhdfs import WebHDFSHook
hook = WebHDFSHook(webhdfs_conn_id='my_hdfs_conn')
# Check if a file exists
file_exists = hook.check_for_path('/user/data/input.csv')
print(f"File exists: {file_exists}")
# Check if a directory exists
dir_exists = hook.check_for_path('/user/data/staging/')
print(f"Directory exists: {dir_exists}")Upload files and directories from local filesystem to HDFS with configurable parallelism and overwrite behavior.
def load_file(
self,
source: str,
destination: str,
overwrite: bool = True,
parallelism: int = 1,
**kwargs
) -> None:
"""
Upload a file to HDFS.
Parameters:
source: Local path to file or folder. If it's a folder, all the files inside it
will be uploaded. Note: This implies that folders empty of files will not
be created remotely.
destination: Target HDFS path. If it already exists and is a directory, files
will be uploaded inside.
overwrite: Overwrite any existing file or directory
parallelism: Number of threads to use for parallelization. A value of 0 (or negative)
uses as many threads as there are files.
**kwargs: Keyword arguments forwarded to hdfs.client.Client.upload
Raises:
HdfsError: If upload operation fails
"""Usage examples:
from airflow.providers.apache.hdfs.hooks.webhdfs import WebHDFSHook
hook = WebHDFSHook()
# Upload a single file
hook.load_file(
source='/local/data/sales.csv',
destination='/hdfs/warehouse/sales/sales.csv',
overwrite=True
)
# Upload an entire directory with parallel processing
hook.load_file(
source='/local/data/batch_files/',
destination='/hdfs/warehouse/batch/',
overwrite=True,
parallelism=4 # Use 4 parallel threads
)
# Upload with custom hdfs client options
hook.load_file(
source='/local/data/large_file.parquet',
destination='/hdfs/warehouse/large_file.parquet',
overwrite=False,
parallelism=1,
chunk_size=65536, # Custom chunk size for large files
permission=755 # Set file permissions
)Read file content directly from HDFS into memory as bytes.
def read_file(self, filename: str) -> bytes:
"""
Read a file from HDFS.
Parameters:
filename: The path of the file to read
Returns:
bytes: File content as raw bytes
Raises:
HdfsError: If file cannot be read or does not exist
"""Usage examples:
from airflow.providers.apache.hdfs.hooks.webhdfs import WebHDFSHook
hook = WebHDFSHook()
# Read a text file
content = hook.read_file('/hdfs/data/config.txt')
text_content = content.decode('utf-8')
print(text_content)
# Read a binary file
binary_data = hook.read_file('/hdfs/data/image.png')
print(f"File size: {len(binary_data)} bytes")
# Read and process CSV data
csv_bytes = hook.read_file('/hdfs/data/sales.csv')
csv_text = csv_bytes.decode('utf-8')
# Process with pandas
import io
import pandas as pd
df = pd.read_csv(io.StringIO(csv_text))
print(df.head())Set up WebHDFS connections through Airflow's connection interface:
# Connection configuration
conn_id = 'my_hdfs_cluster'
conn_type = 'webhdfs'
host = 'namenode1.example.com,namenode2.example.com' # HA setup
port = 9870
login = 'hdfs_user'
password = 'optional_password' # For basic auth
schema = 'webhdfs/v1' # Optional path prefixConfigure SSL, certificates, and authentication through connection extras:
# SSL Configuration
extras = {
"use_ssl": True, # Enable HTTPS
"verify": "/path/to/ca-cert.pem", # CA certificate for verification
"cert": "/path/to/client-cert.pem", # Client certificate for mTLS
"key": "/path/to/client-key.pem", # Client private key for mTLS
"cookies": {"session": "abc123"}, # Custom cookies
"headers": {"X-Custom": "value"} # Custom headers
}The hook automatically detects Kerberos security mode from Airflow configuration:
# Kerberos is enabled when core.security = "kerberos" in airflow.cfg
# The hook will automatically use KerberosClient instead of InsecureClient
# Ensure proper Kerberos configuration and ticket availability
# Example with Kerberos
hook = WebHDFSHook(
webhdfs_conn_id='kerberos_hdfs_conn',
proxy_user='data_engineer' # Optional proxy user
)Configure multiple namenodes for automatic failover:
# In Airflow connection configuration:
# Host: namenode1.example.com,namenode2.example.com,namenode3.example.com
# The hook will automatically try each namenode until it finds an active one
hook = WebHDFSHook(webhdfs_conn_id='ha_hdfs_cluster')
# Hook will test connectivity to each namenode and use the first available oneclass AirflowWebHDFSHookException(AirflowException):
"""Exception specific for WebHDFS hook operations."""This exception is raised by WebHDFS hook operations when errors occur during connection establishment, file operations, or configuration issues.
from airflow.providers.apache.hdfs.hooks.webhdfs import WebHDFSHook, AirflowWebHDFSHookException
from hdfs import HdfsError
hook = WebHDFSHook()
try:
# Check connection
client = hook.get_conn()
# Perform file operations
hook.load_file('/local/file.txt', '/hdfs/file.txt')
except AirflowWebHDFSHookException as e:
print(f"Hook error: {e}")
# Handle connection or configuration issues
except HdfsError as e:
print(f"HDFS operation error: {e}")
# Handle HDFS-specific errors (file not found, permissions, etc.)
except Exception as e:
print(f"Unexpected error: {e}")
# Handle other errorsfrom airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.apache.hdfs.hooks.webhdfs import WebHDFSHook
from datetime import datetime
def process_hdfs_data():
hook = WebHDFSHook(webhdfs_conn_id='prod_hdfs')
# Check if input file exists
if hook.check_for_path('/input/daily_data.csv'):
# Read and process data
data = hook.read_file('/input/daily_data.csv')
processed_data = process_data(data) # Your processing logic
# Upload processed result
with open('/tmp/processed.csv', 'wb') as f:
f.write(processed_data)
hook.load_file('/tmp/processed.csv', '/output/processed_data.csv')
print("Data processing completed successfully")
else:
raise ValueError("Input file not found in HDFS")
dag = DAG('hdfs_processing', start_date=datetime(2024, 1, 1))
task = PythonOperator(
task_id='process_hdfs_data',
python_callable=process_hdfs_data,
dag=dag
)from airflow.providers.apache.hdfs.hooks.webhdfs import WebHDFSHook
class CustomHDFSHook(WebHDFSHook):
"""Custom HDFS hook with additional functionality."""
def upload_with_metadata(self, source: str, destination: str, metadata: dict):
"""Upload file with custom metadata handling."""
# Upload main file
self.load_file(source, destination)
# Upload metadata file
metadata_path = f"{destination}.metadata"
with open('/tmp/metadata.json', 'w') as f:
json.dump(metadata, f)
self.load_file('/tmp/metadata.json', metadata_path)
def list_directory_contents(self, path: str) -> list:
"""List contents of HDFS directory."""
client = self.get_conn()
return client.list(path)Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-apache-hdfs