or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.mdlogging-integration.mdmonitoring-sensors.mdwebhdfs-operations.md
tile.json

tessl/pypi-apache-airflow-providers-apache-hdfs

Provider package for Apache Airflow integration with Hadoop Distributed File System (HDFS) and WebHDFS operations

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/apache-airflow-providers-apache-hdfs@4.10.x

To install, run

npx @tessl/cli install tessl/pypi-apache-airflow-providers-apache-hdfs@4.10.0

index.mddocs/

Apache Airflow Providers Apache HDFS

Provider package for Apache Airflow integration with Hadoop Distributed File System (HDFS) and WebHDFS operations. This package enables data engineers to interact with HDFS clusters through Airflow workflows, providing hooks for WebHDFS API operations, sensors for monitoring HDFS file states, and task handlers for logging to HDFS storage.

Package Information

  • Package Name: apache-airflow-providers-apache-hdfs
  • Package Type: pypi
  • Language: Python
  • Installation: pip install apache-airflow-providers-apache-hdfs

Core Imports

from airflow.providers.apache.hdfs.hooks.webhdfs import WebHDFSHook
from airflow.providers.apache.hdfs.sensors.web_hdfs import WebHdfsSensor, MultipleFilesWebHdfsSensor

For logging integration:

from airflow.providers.apache.hdfs.log.hdfs_task_handler import HdfsTaskHandler

Basic Usage

from airflow.providers.apache.hdfs.hooks.webhdfs import WebHDFSHook
from airflow.providers.apache.hdfs.sensors.web_hdfs import WebHdfsSensor
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def upload_to_hdfs():
    # Create WebHDFS hook
    hook = WebHDFSHook(webhdfs_conn_id='webhdfs_default')
    
    # Upload a file
    hook.load_file(
        source='/local/path/data.csv',
        destination='/hdfs/path/data.csv',
        overwrite=True
    )
    
    # Check if file exists
    exists = hook.check_for_path('/hdfs/path/data.csv')
    print(f"File exists: {exists}")

# Define DAG
dag = DAG(
    'hdfs_example',
    start_date=datetime(2024, 1, 1),
    schedule_interval=None
)

# Sensor to wait for file
file_sensor = WebHdfsSensor(
    task_id='wait_for_file',
    filepath='/hdfs/input/data_ready.flag',
    webhdfs_conn_id='webhdfs_default',
    poke_interval=30,
    timeout=300,
    dag=dag
)

# Upload task
upload_task = PythonOperator(
    task_id='upload_to_hdfs',
    python_callable=upload_to_hdfs,
    dag=dag
)

file_sensor >> upload_task

Architecture

The provider follows Airflow's standard architecture patterns:

  • Hooks: Connection-based classes for interacting with external systems (WebHDFSHook)
  • Sensors: Long-running tasks that wait for conditions to be met (WebHdfsSensor, MultipleFilesWebHdfsSensor)
  • Handlers: Infrastructure components for logging and task management (HdfsTaskHandler)
  • Version Compatibility: Abstraction layer for supporting multiple Airflow versions

The package integrates with Airflow's connection system for credential management and supports both insecure and Kerberos-authenticated connections to HDFS clusters.

Capabilities

WebHDFS Operations

Core functionality for interacting with HDFS through the WebHDFS REST API, including file operations, path checking, and connection management with support for SSL, authentication, and high availability configurations.

class WebHDFSHook:
    def __init__(self, webhdfs_conn_id: str = "webhdfs_default", proxy_user: str | None = None): ...
    def get_conn(self) -> Any: ...
    def check_for_path(self, hdfs_path: str) -> bool: ...
    def load_file(self, source: str, destination: str, overwrite: bool = True, parallelism: int = 1, **kwargs) -> None: ...
    def read_file(self, filename: str) -> bytes: ...

WebHDFS Operations

File System Monitoring

Sensors for monitoring HDFS file system states, including single file detection and multiple file monitoring capabilities for workflow coordination and data pipeline triggering.

class WebHdfsSensor:
    def __init__(self, *, filepath: str, webhdfs_conn_id: str = "webhdfs_default", **kwargs): ...
    def poke(self, context) -> bool: ...

class MultipleFilesWebHdfsSensor:
    def __init__(self, *, directory_path: str, expected_filenames, webhdfs_conn_id: str = "webhdfs_default", **kwargs): ...
    def poke(self, context) -> bool: ...

File System Monitoring

HDFS Logging Integration

Task handlers for storing Airflow task logs in HDFS, enabling centralized log management and integration with Hadoop ecosystem logging infrastructure.

class HdfsTaskHandler:
    def __init__(self, base_log_folder: str, hdfs_log_folder: str, **kwargs): ...
    def set_context(self, ti, *, identifier: str | None = None) -> None: ...
    def close(self) -> None: ...

HDFS Logging Integration

Connection Configuration

The package uses Airflow's connection system with the following configuration:

  • Connection Type: webhdfs
  • Default Connection ID: webhdfs_default
  • Host: HDFS namenode(s), comma-separated for HA setups
  • Port: WebHDFS port (typically 9870 for secure, 50070 for older versions)
  • Login: Username for authentication
  • Password: Password for basic authentication (optional)
  • Schema: WebHDFS path prefix (optional)

Extra Configuration Options

# Connection extras support the following options:
{
    "use_ssl": bool,           # Enable HTTPS connections
    "verify": bool | str,      # SSL certificate verification
    "cert": str,               # Client certificate path for mTLS
    "key": str,                # Client key path for mTLS
    "cookies": dict,           # Custom cookies
    "headers": dict            # Custom headers
}

Types

from airflow.utils.context import Context
from airflow.models.taskinstance import TaskInstance
from airflow.sdk.types import RuntimeTaskInstanceProtocol as RuntimeTI
from airflow.utils.log.file_task_handler import LogMessages, LogSourceInfo
from collections.abc import Sequence
from typing import Any
import os
from hdfs import InsecureClient
from hdfs.ext.kerberos import KerberosClient

# Connection client types
Client = InsecureClient | KerberosClient

# Task handler return types
LogMessages = list[str]
LogSourceInfo = list[str]