or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdlogging-integration.mdmonitoring-sensors.mdwebhdfs-operations.md

webhdfs-operations.mddocs/

0

# WebHDFS Operations

1

2

Core functionality for interacting with HDFS through the WebHDFS REST API. The WebHDFSHook provides a comprehensive interface for file operations, connection management, and authentication with HDFS clusters.

3

4

## Capabilities

5

6

### Connection Management

7

8

Establishes connections to HDFS namenode clusters with support for high availability configurations, automatic failover, and various authentication methods.

9

10

```python { .api }

11

class WebHDFSHook:

12

"""

13

Main hook for interacting with HDFS via WebHDFS API.

14

15

Attributes:

16

conn_type (str): Connection type identifier ("webhdfs")

17

conn_name_attr (str): Connection name attribute ("webhdfs_conn_id")

18

default_conn_name (str): Default connection name ("webhdfs_default")

19

hook_name (str): Human readable hook name ("Apache WebHDFS")

20

"""

21

22

def __init__(self, webhdfs_conn_id: str = "webhdfs_default", proxy_user: str | None = None):

23

"""

24

Initialize WebHDFS hook.

25

26

Parameters:

27

webhdfs_conn_id: The connection id for the webhdfs client to connect to

28

proxy_user: The user used to authenticate

29

"""

30

31

def get_conn(self) -> Any:

32

"""

33

Establish a connection depending on the security mode set via config or environment variable.

34

35

Returns:

36

Any: A hdfscli client object (InsecureClient or KerberosClient)

37

38

Raises:

39

AirflowWebHDFSHookException: If failed to locate valid server

40

"""

41

```

42

43

### Path Operations

44

45

Check for the existence of files and directories in HDFS file system.

46

47

```python { .api }

48

def check_for_path(self, hdfs_path: str) -> bool:

49

"""

50

Check for the existence of a path in HDFS by querying FileStatus.

51

52

Parameters:

53

hdfs_path: The path to check

54

55

Returns:

56

bool: True if the path exists and False if not

57

"""

58

```

59

60

Usage example:

61

62

```python

63

from airflow.providers.apache.hdfs.hooks.webhdfs import WebHDFSHook

64

65

hook = WebHDFSHook(webhdfs_conn_id='my_hdfs_conn')

66

67

# Check if a file exists

68

file_exists = hook.check_for_path('/user/data/input.csv')

69

print(f"File exists: {file_exists}")

70

71

# Check if a directory exists

72

dir_exists = hook.check_for_path('/user/data/staging/')

73

print(f"Directory exists: {dir_exists}")

74

```

75

76

### File Upload Operations

77

78

Upload files and directories from local filesystem to HDFS with configurable parallelism and overwrite behavior.

79

80

```python { .api }

81

def load_file(

82

self,

83

source: str,

84

destination: str,

85

overwrite: bool = True,

86

parallelism: int = 1,

87

**kwargs

88

) -> None:

89

"""

90

Upload a file to HDFS.

91

92

Parameters:

93

source: Local path to file or folder. If it's a folder, all the files inside it

94

will be uploaded. Note: This implies that folders empty of files will not

95

be created remotely.

96

destination: Target HDFS path. If it already exists and is a directory, files

97

will be uploaded inside.

98

overwrite: Overwrite any existing file or directory

99

parallelism: Number of threads to use for parallelization. A value of 0 (or negative)

100

uses as many threads as there are files.

101

**kwargs: Keyword arguments forwarded to hdfs.client.Client.upload

102

103

Raises:

104

HdfsError: If upload operation fails

105

"""

106

```

107

108

Usage examples:

109

110

```python

111

from airflow.providers.apache.hdfs.hooks.webhdfs import WebHDFSHook

112

113

hook = WebHDFSHook()

114

115

# Upload a single file

116

hook.load_file(

117

source='/local/data/sales.csv',

118

destination='/hdfs/warehouse/sales/sales.csv',

119

overwrite=True

120

)

121

122

# Upload an entire directory with parallel processing

123

hook.load_file(

124

source='/local/data/batch_files/',

125

destination='/hdfs/warehouse/batch/',

126

overwrite=True,

127

parallelism=4 # Use 4 parallel threads

128

)

129

130

# Upload with custom hdfs client options

131

hook.load_file(

132

source='/local/data/large_file.parquet',

133

destination='/hdfs/warehouse/large_file.parquet',

134

overwrite=False,

135

parallelism=1,

136

chunk_size=65536, # Custom chunk size for large files

137

permission=755 # Set file permissions

138

)

139

```

140

141

### File Read Operations

142

143

Read file content directly from HDFS into memory as bytes.

144

145

```python { .api }

146

def read_file(self, filename: str) -> bytes:

147

"""

148

Read a file from HDFS.

149

150

Parameters:

151

filename: The path of the file to read

152

153

Returns:

154

bytes: File content as raw bytes

155

156

Raises:

157

HdfsError: If file cannot be read or does not exist

158

"""

159

```

160

161

Usage examples:

162

163

```python

164

from airflow.providers.apache.hdfs.hooks.webhdfs import WebHDFSHook

165

166

hook = WebHDFSHook()

167

168

# Read a text file

169

content = hook.read_file('/hdfs/data/config.txt')

170

text_content = content.decode('utf-8')

171

print(text_content)

172

173

# Read a binary file

174

binary_data = hook.read_file('/hdfs/data/image.png')

175

print(f"File size: {len(binary_data)} bytes")

176

177

# Read and process CSV data

178

csv_bytes = hook.read_file('/hdfs/data/sales.csv')

179

csv_text = csv_bytes.decode('utf-8')

180

181

# Process with pandas

182

import io

183

import pandas as pd

184

df = pd.read_csv(io.StringIO(csv_text))

185

print(df.head())

186

```

187

188

## Connection Configuration

189

190

### Basic Configuration

191

192

Set up WebHDFS connections through Airflow's connection interface:

193

194

```python

195

# Connection configuration

196

conn_id = 'my_hdfs_cluster'

197

conn_type = 'webhdfs'

198

host = 'namenode1.example.com,namenode2.example.com' # HA setup

199

port = 9870

200

login = 'hdfs_user'

201

password = 'optional_password' # For basic auth

202

schema = 'webhdfs/v1' # Optional path prefix

203

```

204

205

### SSL and Security Configuration

206

207

Configure SSL, certificates, and authentication through connection extras:

208

209

```python

210

# SSL Configuration

211

extras = {

212

"use_ssl": True, # Enable HTTPS

213

"verify": "/path/to/ca-cert.pem", # CA certificate for verification

214

"cert": "/path/to/client-cert.pem", # Client certificate for mTLS

215

"key": "/path/to/client-key.pem", # Client private key for mTLS

216

"cookies": {"session": "abc123"}, # Custom cookies

217

"headers": {"X-Custom": "value"} # Custom headers

218

}

219

```

220

221

### Kerberos Authentication

222

223

The hook automatically detects Kerberos security mode from Airflow configuration:

224

225

```python

226

# Kerberos is enabled when core.security = "kerberos" in airflow.cfg

227

# The hook will automatically use KerberosClient instead of InsecureClient

228

# Ensure proper Kerberos configuration and ticket availability

229

230

# Example with Kerberos

231

hook = WebHDFSHook(

232

webhdfs_conn_id='kerberos_hdfs_conn',

233

proxy_user='data_engineer' # Optional proxy user

234

)

235

```

236

237

### High Availability Setup

238

239

Configure multiple namenodes for automatic failover:

240

241

```python

242

# In Airflow connection configuration:

243

# Host: namenode1.example.com,namenode2.example.com,namenode3.example.com

244

# The hook will automatically try each namenode until it finds an active one

245

246

hook = WebHDFSHook(webhdfs_conn_id='ha_hdfs_cluster')

247

# Hook will test connectivity to each namenode and use the first available one

248

```

249

250

## Error Handling

251

252

### Exception Types

253

254

```python { .api }

255

class AirflowWebHDFSHookException(AirflowException):

256

"""Exception specific for WebHDFS hook operations."""

257

```

258

259

This exception is raised by WebHDFS hook operations when errors occur during connection establishment, file operations, or configuration issues.

260

261

### Common Error Scenarios

262

263

```python

264

from airflow.providers.apache.hdfs.hooks.webhdfs import WebHDFSHook, AirflowWebHDFSHookException

265

from hdfs import HdfsError

266

267

hook = WebHDFSHook()

268

269

try:

270

# Check connection

271

client = hook.get_conn()

272

273

# Perform file operations

274

hook.load_file('/local/file.txt', '/hdfs/file.txt')

275

276

except AirflowWebHDFSHookException as e:

277

print(f"Hook error: {e}")

278

# Handle connection or configuration issues

279

280

except HdfsError as e:

281

print(f"HDFS operation error: {e}")

282

# Handle HDFS-specific errors (file not found, permissions, etc.)

283

284

except Exception as e:

285

print(f"Unexpected error: {e}")

286

# Handle other errors

287

```

288

289

## Integration with Airflow Tasks

290

291

### Using in PythonOperator

292

293

```python

294

from airflow import DAG

295

from airflow.operators.python import PythonOperator

296

from airflow.providers.apache.hdfs.hooks.webhdfs import WebHDFSHook

297

from datetime import datetime

298

299

def process_hdfs_data():

300

hook = WebHDFSHook(webhdfs_conn_id='prod_hdfs')

301

302

# Check if input file exists

303

if hook.check_for_path('/input/daily_data.csv'):

304

# Read and process data

305

data = hook.read_file('/input/daily_data.csv')

306

processed_data = process_data(data) # Your processing logic

307

308

# Upload processed result

309

with open('/tmp/processed.csv', 'wb') as f:

310

f.write(processed_data)

311

312

hook.load_file('/tmp/processed.csv', '/output/processed_data.csv')

313

print("Data processing completed successfully")

314

else:

315

raise ValueError("Input file not found in HDFS")

316

317

dag = DAG('hdfs_processing', start_date=datetime(2024, 1, 1))

318

319

task = PythonOperator(

320

task_id='process_hdfs_data',

321

python_callable=process_hdfs_data,

322

dag=dag

323

)

324

```

325

326

### Custom Hook Subclassing

327

328

```python

329

from airflow.providers.apache.hdfs.hooks.webhdfs import WebHDFSHook

330

331

class CustomHDFSHook(WebHDFSHook):

332

"""Custom HDFS hook with additional functionality."""

333

334

def upload_with_metadata(self, source: str, destination: str, metadata: dict):

335

"""Upload file with custom metadata handling."""

336

# Upload main file

337

self.load_file(source, destination)

338

339

# Upload metadata file

340

metadata_path = f"{destination}.metadata"

341

with open('/tmp/metadata.json', 'w') as f:

342

json.dump(metadata, f)

343

344

self.load_file('/tmp/metadata.json', metadata_path)

345

346

def list_directory_contents(self, path: str) -> list:

347

"""List contents of HDFS directory."""

348

client = self.get_conn()

349

return client.list(path)

350

```