Provider package for Apache Airflow integration with Hadoop Distributed File System (HDFS) and WebHDFS operations
npx @tessl/cli install tessl/pypi-apache-airflow-providers-apache-hdfs@4.10.00
# Apache Airflow Providers Apache HDFS
1
2
Provider package for Apache Airflow integration with Hadoop Distributed File System (HDFS) and WebHDFS operations. This package enables data engineers to interact with HDFS clusters through Airflow workflows, providing hooks for WebHDFS API operations, sensors for monitoring HDFS file states, and task handlers for logging to HDFS storage.
3
4
## Package Information
5
6
- **Package Name**: apache-airflow-providers-apache-hdfs
7
- **Package Type**: pypi
8
- **Language**: Python
9
- **Installation**: `pip install apache-airflow-providers-apache-hdfs`
10
11
## Core Imports
12
13
```python
14
from airflow.providers.apache.hdfs.hooks.webhdfs import WebHDFSHook
15
from airflow.providers.apache.hdfs.sensors.web_hdfs import WebHdfsSensor, MultipleFilesWebHdfsSensor
16
```
17
18
For logging integration:
19
20
```python
21
from airflow.providers.apache.hdfs.log.hdfs_task_handler import HdfsTaskHandler
22
```
23
24
## Basic Usage
25
26
```python
27
from airflow.providers.apache.hdfs.hooks.webhdfs import WebHDFSHook
28
from airflow.providers.apache.hdfs.sensors.web_hdfs import WebHdfsSensor
29
from airflow import DAG
30
from airflow.operators.python import PythonOperator
31
from datetime import datetime
32
33
def upload_to_hdfs():
34
# Create WebHDFS hook
35
hook = WebHDFSHook(webhdfs_conn_id='webhdfs_default')
36
37
# Upload a file
38
hook.load_file(
39
source='/local/path/data.csv',
40
destination='/hdfs/path/data.csv',
41
overwrite=True
42
)
43
44
# Check if file exists
45
exists = hook.check_for_path('/hdfs/path/data.csv')
46
print(f"File exists: {exists}")
47
48
# Define DAG
49
dag = DAG(
50
'hdfs_example',
51
start_date=datetime(2024, 1, 1),
52
schedule_interval=None
53
)
54
55
# Sensor to wait for file
56
file_sensor = WebHdfsSensor(
57
task_id='wait_for_file',
58
filepath='/hdfs/input/data_ready.flag',
59
webhdfs_conn_id='webhdfs_default',
60
poke_interval=30,
61
timeout=300,
62
dag=dag
63
)
64
65
# Upload task
66
upload_task = PythonOperator(
67
task_id='upload_to_hdfs',
68
python_callable=upload_to_hdfs,
69
dag=dag
70
)
71
72
file_sensor >> upload_task
73
```
74
75
## Architecture
76
77
The provider follows Airflow's standard architecture patterns:
78
79
- **Hooks**: Connection-based classes for interacting with external systems (WebHDFSHook)
80
- **Sensors**: Long-running tasks that wait for conditions to be met (WebHdfsSensor, MultipleFilesWebHdfsSensor)
81
- **Handlers**: Infrastructure components for logging and task management (HdfsTaskHandler)
82
- **Version Compatibility**: Abstraction layer for supporting multiple Airflow versions
83
84
The package integrates with Airflow's connection system for credential management and supports both insecure and Kerberos-authenticated connections to HDFS clusters.
85
86
## Capabilities
87
88
### WebHDFS Operations
89
90
Core functionality for interacting with HDFS through the WebHDFS REST API, including file operations, path checking, and connection management with support for SSL, authentication, and high availability configurations.
91
92
```python { .api }
93
class WebHDFSHook:
94
def __init__(self, webhdfs_conn_id: str = "webhdfs_default", proxy_user: str | None = None): ...
95
def get_conn(self) -> Any: ...
96
def check_for_path(self, hdfs_path: str) -> bool: ...
97
def load_file(self, source: str, destination: str, overwrite: bool = True, parallelism: int = 1, **kwargs) -> None: ...
98
def read_file(self, filename: str) -> bytes: ...
99
```
100
101
[WebHDFS Operations](./webhdfs-operations.md)
102
103
### File System Monitoring
104
105
Sensors for monitoring HDFS file system states, including single file detection and multiple file monitoring capabilities for workflow coordination and data pipeline triggering.
106
107
```python { .api }
108
class WebHdfsSensor:
109
def __init__(self, *, filepath: str, webhdfs_conn_id: str = "webhdfs_default", **kwargs): ...
110
def poke(self, context) -> bool: ...
111
112
class MultipleFilesWebHdfsSensor:
113
def __init__(self, *, directory_path: str, expected_filenames, webhdfs_conn_id: str = "webhdfs_default", **kwargs): ...
114
def poke(self, context) -> bool: ...
115
```
116
117
[File System Monitoring](./monitoring-sensors.md)
118
119
### HDFS Logging Integration
120
121
Task handlers for storing Airflow task logs in HDFS, enabling centralized log management and integration with Hadoop ecosystem logging infrastructure.
122
123
```python { .api }
124
class HdfsTaskHandler:
125
def __init__(self, base_log_folder: str, hdfs_log_folder: str, **kwargs): ...
126
def set_context(self, ti, *, identifier: str | None = None) -> None: ...
127
def close(self) -> None: ...
128
```
129
130
[HDFS Logging Integration](./logging-integration.md)
131
132
## Connection Configuration
133
134
The package uses Airflow's connection system with the following configuration:
135
136
- **Connection Type**: `webhdfs`
137
- **Default Connection ID**: `webhdfs_default`
138
- **Host**: HDFS namenode(s), comma-separated for HA setups
139
- **Port**: WebHDFS port (typically 9870 for secure, 50070 for older versions)
140
- **Login**: Username for authentication
141
- **Password**: Password for basic authentication (optional)
142
- **Schema**: WebHDFS path prefix (optional)
143
144
### Extra Configuration Options
145
146
```python { .api }
147
# Connection extras support the following options:
148
{
149
"use_ssl": bool, # Enable HTTPS connections
150
"verify": bool | str, # SSL certificate verification
151
"cert": str, # Client certificate path for mTLS
152
"key": str, # Client key path for mTLS
153
"cookies": dict, # Custom cookies
154
"headers": dict # Custom headers
155
}
156
```
157
158
## Types
159
160
```python { .api }
161
from airflow.utils.context import Context
162
from airflow.models.taskinstance import TaskInstance
163
from airflow.sdk.types import RuntimeTaskInstanceProtocol as RuntimeTI
164
from airflow.utils.log.file_task_handler import LogMessages, LogSourceInfo
165
from collections.abc import Sequence
166
from typing import Any
167
import os
168
from hdfs import InsecureClient
169
from hdfs.ext.kerberos import KerberosClient
170
171
# Connection client types
172
Client = InsecureClient | KerberosClient
173
174
# Task handler return types
175
LogMessages = list[str]
176
LogSourceInfo = list[str]
177
```