0
# WebHDFS Operations
1
2
Core functionality for interacting with HDFS through the WebHDFS REST API. The WebHDFSHook provides a comprehensive interface for file operations, connection management, and authentication with HDFS clusters.
3
4
## Capabilities
5
6
### Connection Management
7
8
Establishes connections to HDFS namenode clusters with support for high availability configurations, automatic failover, and various authentication methods.
9
10
```python { .api }
11
class WebHDFSHook:
12
"""
13
Main hook for interacting with HDFS via WebHDFS API.
14
15
Attributes:
16
conn_type (str): Connection type identifier ("webhdfs")
17
conn_name_attr (str): Connection name attribute ("webhdfs_conn_id")
18
default_conn_name (str): Default connection name ("webhdfs_default")
19
hook_name (str): Human readable hook name ("Apache WebHDFS")
20
"""
21
22
def __init__(self, webhdfs_conn_id: str = "webhdfs_default", proxy_user: str | None = None):
23
"""
24
Initialize WebHDFS hook.
25
26
Parameters:
27
webhdfs_conn_id: The connection id for the webhdfs client to connect to
28
proxy_user: The user used to authenticate
29
"""
30
31
def get_conn(self) -> Any:
32
"""
33
Establish a connection depending on the security mode set via config or environment variable.
34
35
Returns:
36
Any: A hdfscli client object (InsecureClient or KerberosClient)
37
38
Raises:
39
AirflowWebHDFSHookException: If failed to locate valid server
40
"""
41
```
42
43
### Path Operations
44
45
Check for the existence of files and directories in HDFS file system.
46
47
```python { .api }
48
def check_for_path(self, hdfs_path: str) -> bool:
49
"""
50
Check for the existence of a path in HDFS by querying FileStatus.
51
52
Parameters:
53
hdfs_path: The path to check
54
55
Returns:
56
bool: True if the path exists and False if not
57
"""
58
```
59
60
Usage example:
61
62
```python
63
from airflow.providers.apache.hdfs.hooks.webhdfs import WebHDFSHook
64
65
hook = WebHDFSHook(webhdfs_conn_id='my_hdfs_conn')
66
67
# Check if a file exists
68
file_exists = hook.check_for_path('/user/data/input.csv')
69
print(f"File exists: {file_exists}")
70
71
# Check if a directory exists
72
dir_exists = hook.check_for_path('/user/data/staging/')
73
print(f"Directory exists: {dir_exists}")
74
```
75
76
### File Upload Operations
77
78
Upload files and directories from local filesystem to HDFS with configurable parallelism and overwrite behavior.
79
80
```python { .api }
81
def load_file(
82
self,
83
source: str,
84
destination: str,
85
overwrite: bool = True,
86
parallelism: int = 1,
87
**kwargs
88
) -> None:
89
"""
90
Upload a file to HDFS.
91
92
Parameters:
93
source: Local path to file or folder. If it's a folder, all the files inside it
94
will be uploaded. Note: This implies that folders empty of files will not
95
be created remotely.
96
destination: Target HDFS path. If it already exists and is a directory, files
97
will be uploaded inside.
98
overwrite: Overwrite any existing file or directory
99
parallelism: Number of threads to use for parallelization. A value of 0 (or negative)
100
uses as many threads as there are files.
101
**kwargs: Keyword arguments forwarded to hdfs.client.Client.upload
102
103
Raises:
104
HdfsError: If upload operation fails
105
"""
106
```
107
108
Usage examples:
109
110
```python
111
from airflow.providers.apache.hdfs.hooks.webhdfs import WebHDFSHook
112
113
hook = WebHDFSHook()
114
115
# Upload a single file
116
hook.load_file(
117
source='/local/data/sales.csv',
118
destination='/hdfs/warehouse/sales/sales.csv',
119
overwrite=True
120
)
121
122
# Upload an entire directory with parallel processing
123
hook.load_file(
124
source='/local/data/batch_files/',
125
destination='/hdfs/warehouse/batch/',
126
overwrite=True,
127
parallelism=4 # Use 4 parallel threads
128
)
129
130
# Upload with custom hdfs client options
131
hook.load_file(
132
source='/local/data/large_file.parquet',
133
destination='/hdfs/warehouse/large_file.parquet',
134
overwrite=False,
135
parallelism=1,
136
chunk_size=65536, # Custom chunk size for large files
137
permission=755 # Set file permissions
138
)
139
```
140
141
### File Read Operations
142
143
Read file content directly from HDFS into memory as bytes.
144
145
```python { .api }
146
def read_file(self, filename: str) -> bytes:
147
"""
148
Read a file from HDFS.
149
150
Parameters:
151
filename: The path of the file to read
152
153
Returns:
154
bytes: File content as raw bytes
155
156
Raises:
157
HdfsError: If file cannot be read or does not exist
158
"""
159
```
160
161
Usage examples:
162
163
```python
164
from airflow.providers.apache.hdfs.hooks.webhdfs import WebHDFSHook
165
166
hook = WebHDFSHook()
167
168
# Read a text file
169
content = hook.read_file('/hdfs/data/config.txt')
170
text_content = content.decode('utf-8')
171
print(text_content)
172
173
# Read a binary file
174
binary_data = hook.read_file('/hdfs/data/image.png')
175
print(f"File size: {len(binary_data)} bytes")
176
177
# Read and process CSV data
178
csv_bytes = hook.read_file('/hdfs/data/sales.csv')
179
csv_text = csv_bytes.decode('utf-8')
180
181
# Process with pandas
182
import io
183
import pandas as pd
184
df = pd.read_csv(io.StringIO(csv_text))
185
print(df.head())
186
```
187
188
## Connection Configuration
189
190
### Basic Configuration
191
192
Set up WebHDFS connections through Airflow's connection interface:
193
194
```python
195
# Connection configuration
196
conn_id = 'my_hdfs_cluster'
197
conn_type = 'webhdfs'
198
host = 'namenode1.example.com,namenode2.example.com' # HA setup
199
port = 9870
200
login = 'hdfs_user'
201
password = 'optional_password' # For basic auth
202
schema = 'webhdfs/v1' # Optional path prefix
203
```
204
205
### SSL and Security Configuration
206
207
Configure SSL, certificates, and authentication through connection extras:
208
209
```python
210
# SSL Configuration
211
extras = {
212
"use_ssl": True, # Enable HTTPS
213
"verify": "/path/to/ca-cert.pem", # CA certificate for verification
214
"cert": "/path/to/client-cert.pem", # Client certificate for mTLS
215
"key": "/path/to/client-key.pem", # Client private key for mTLS
216
"cookies": {"session": "abc123"}, # Custom cookies
217
"headers": {"X-Custom": "value"} # Custom headers
218
}
219
```
220
221
### Kerberos Authentication
222
223
The hook automatically detects Kerberos security mode from Airflow configuration:
224
225
```python
226
# Kerberos is enabled when core.security = "kerberos" in airflow.cfg
227
# The hook will automatically use KerberosClient instead of InsecureClient
228
# Ensure proper Kerberos configuration and ticket availability
229
230
# Example with Kerberos
231
hook = WebHDFSHook(
232
webhdfs_conn_id='kerberos_hdfs_conn',
233
proxy_user='data_engineer' # Optional proxy user
234
)
235
```
236
237
### High Availability Setup
238
239
Configure multiple namenodes for automatic failover:
240
241
```python
242
# In Airflow connection configuration:
243
# Host: namenode1.example.com,namenode2.example.com,namenode3.example.com
244
# The hook will automatically try each namenode until it finds an active one
245
246
hook = WebHDFSHook(webhdfs_conn_id='ha_hdfs_cluster')
247
# Hook will test connectivity to each namenode and use the first available one
248
```
249
250
## Error Handling
251
252
### Exception Types
253
254
```python { .api }
255
class AirflowWebHDFSHookException(AirflowException):
256
"""Exception specific for WebHDFS hook operations."""
257
```
258
259
This exception is raised by WebHDFS hook operations when errors occur during connection establishment, file operations, or configuration issues.
260
261
### Common Error Scenarios
262
263
```python
264
from airflow.providers.apache.hdfs.hooks.webhdfs import WebHDFSHook, AirflowWebHDFSHookException
265
from hdfs import HdfsError
266
267
hook = WebHDFSHook()
268
269
try:
270
# Check connection
271
client = hook.get_conn()
272
273
# Perform file operations
274
hook.load_file('/local/file.txt', '/hdfs/file.txt')
275
276
except AirflowWebHDFSHookException as e:
277
print(f"Hook error: {e}")
278
# Handle connection or configuration issues
279
280
except HdfsError as e:
281
print(f"HDFS operation error: {e}")
282
# Handle HDFS-specific errors (file not found, permissions, etc.)
283
284
except Exception as e:
285
print(f"Unexpected error: {e}")
286
# Handle other errors
287
```
288
289
## Integration with Airflow Tasks
290
291
### Using in PythonOperator
292
293
```python
294
from airflow import DAG
295
from airflow.operators.python import PythonOperator
296
from airflow.providers.apache.hdfs.hooks.webhdfs import WebHDFSHook
297
from datetime import datetime
298
299
def process_hdfs_data():
300
hook = WebHDFSHook(webhdfs_conn_id='prod_hdfs')
301
302
# Check if input file exists
303
if hook.check_for_path('/input/daily_data.csv'):
304
# Read and process data
305
data = hook.read_file('/input/daily_data.csv')
306
processed_data = process_data(data) # Your processing logic
307
308
# Upload processed result
309
with open('/tmp/processed.csv', 'wb') as f:
310
f.write(processed_data)
311
312
hook.load_file('/tmp/processed.csv', '/output/processed_data.csv')
313
print("Data processing completed successfully")
314
else:
315
raise ValueError("Input file not found in HDFS")
316
317
dag = DAG('hdfs_processing', start_date=datetime(2024, 1, 1))
318
319
task = PythonOperator(
320
task_id='process_hdfs_data',
321
python_callable=process_hdfs_data,
322
dag=dag
323
)
324
```
325
326
### Custom Hook Subclassing
327
328
```python
329
from airflow.providers.apache.hdfs.hooks.webhdfs import WebHDFSHook
330
331
class CustomHDFSHook(WebHDFSHook):
332
"""Custom HDFS hook with additional functionality."""
333
334
def upload_with_metadata(self, source: str, destination: str, metadata: dict):
335
"""Upload file with custom metadata handling."""
336
# Upload main file
337
self.load_file(source, destination)
338
339
# Upload metadata file
340
metadata_path = f"{destination}.metadata"
341
with open('/tmp/metadata.json', 'w') as f:
342
json.dump(metadata, f)
343
344
self.load_file('/tmp/metadata.json', metadata_path)
345
346
def list_directory_contents(self, path: str) -> list:
347
"""List contents of HDFS directory."""
348
client = self.get_conn()
349
return client.list(path)
350
```