Provider package for Apache Airflow integration with Hadoop Distributed File System (HDFS) and WebHDFS operations
npx @tessl/cli install tessl/pypi-apache-airflow-providers-apache-hdfs@4.10.0Provider package for Apache Airflow integration with Hadoop Distributed File System (HDFS) and WebHDFS operations. This package enables data engineers to interact with HDFS clusters through Airflow workflows, providing hooks for WebHDFS API operations, sensors for monitoring HDFS file states, and task handlers for logging to HDFS storage.
pip install apache-airflow-providers-apache-hdfsfrom airflow.providers.apache.hdfs.hooks.webhdfs import WebHDFSHook
from airflow.providers.apache.hdfs.sensors.web_hdfs import WebHdfsSensor, MultipleFilesWebHdfsSensorFor logging integration:
from airflow.providers.apache.hdfs.log.hdfs_task_handler import HdfsTaskHandlerfrom airflow.providers.apache.hdfs.hooks.webhdfs import WebHDFSHook
from airflow.providers.apache.hdfs.sensors.web_hdfs import WebHdfsSensor
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def upload_to_hdfs():
# Create WebHDFS hook
hook = WebHDFSHook(webhdfs_conn_id='webhdfs_default')
# Upload a file
hook.load_file(
source='/local/path/data.csv',
destination='/hdfs/path/data.csv',
overwrite=True
)
# Check if file exists
exists = hook.check_for_path('/hdfs/path/data.csv')
print(f"File exists: {exists}")
# Define DAG
dag = DAG(
'hdfs_example',
start_date=datetime(2024, 1, 1),
schedule_interval=None
)
# Sensor to wait for file
file_sensor = WebHdfsSensor(
task_id='wait_for_file',
filepath='/hdfs/input/data_ready.flag',
webhdfs_conn_id='webhdfs_default',
poke_interval=30,
timeout=300,
dag=dag
)
# Upload task
upload_task = PythonOperator(
task_id='upload_to_hdfs',
python_callable=upload_to_hdfs,
dag=dag
)
file_sensor >> upload_taskThe provider follows Airflow's standard architecture patterns:
The package integrates with Airflow's connection system for credential management and supports both insecure and Kerberos-authenticated connections to HDFS clusters.
Core functionality for interacting with HDFS through the WebHDFS REST API, including file operations, path checking, and connection management with support for SSL, authentication, and high availability configurations.
class WebHDFSHook:
def __init__(self, webhdfs_conn_id: str = "webhdfs_default", proxy_user: str | None = None): ...
def get_conn(self) -> Any: ...
def check_for_path(self, hdfs_path: str) -> bool: ...
def load_file(self, source: str, destination: str, overwrite: bool = True, parallelism: int = 1, **kwargs) -> None: ...
def read_file(self, filename: str) -> bytes: ...Sensors for monitoring HDFS file system states, including single file detection and multiple file monitoring capabilities for workflow coordination and data pipeline triggering.
class WebHdfsSensor:
def __init__(self, *, filepath: str, webhdfs_conn_id: str = "webhdfs_default", **kwargs): ...
def poke(self, context) -> bool: ...
class MultipleFilesWebHdfsSensor:
def __init__(self, *, directory_path: str, expected_filenames, webhdfs_conn_id: str = "webhdfs_default", **kwargs): ...
def poke(self, context) -> bool: ...Task handlers for storing Airflow task logs in HDFS, enabling centralized log management and integration with Hadoop ecosystem logging infrastructure.
class HdfsTaskHandler:
def __init__(self, base_log_folder: str, hdfs_log_folder: str, **kwargs): ...
def set_context(self, ti, *, identifier: str | None = None) -> None: ...
def close(self) -> None: ...The package uses Airflow's connection system with the following configuration:
webhdfswebhdfs_default# Connection extras support the following options:
{
"use_ssl": bool, # Enable HTTPS connections
"verify": bool | str, # SSL certificate verification
"cert": str, # Client certificate path for mTLS
"key": str, # Client key path for mTLS
"cookies": dict, # Custom cookies
"headers": dict # Custom headers
}from airflow.utils.context import Context
from airflow.models.taskinstance import TaskInstance
from airflow.sdk.types import RuntimeTaskInstanceProtocol as RuntimeTI
from airflow.utils.log.file_task_handler import LogMessages, LogSourceInfo
from collections.abc import Sequence
from typing import Any
import os
from hdfs import InsecureClient
from hdfs.ext.kerberos import KerberosClient
# Connection client types
Client = InsecureClient | KerberosClient
# Task handler return types
LogMessages = list[str]
LogSourceInfo = list[str]