or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

big-data.mdcloud-storage.mdcompression.mdcore-operations.mdindex.mdnetwork-access.mdutilities.md

big-data.mddocs/

0

# Big Data and Distributed Systems

1

2

Integration with Hadoop ecosystem (HDFS, WebHDFS) for big data processing workflows. Smart-open provides seamless access to distributed file systems commonly used in big data and analytics environments.

3

4

## Capabilities

5

6

### HDFS Operations

7

8

Direct access to Hadoop Distributed File System through command-line interface.

9

10

```python { .api }

11

def open(uri, mode):

12

"""Open HDFS resource via CLI tools.

13

14

Parameters:

15

uri: str - HDFS URI (hdfs://namenode:port/path or hdfs:///path)

16

mode: str - File mode ('rb', 'wb', 'r', 'w')

17

18

Returns:

19

File-like object for HDFS operations

20

21

Notes:

22

Requires hadoop CLI tools to be installed and accessible

23

Uses subprocess calls to hdfs dfs commands

24

"""

25

26

def parse_uri(uri_as_string):

27

"""Parse HDFS URI into components.

28

29

Returns:

30

dict with scheme, namenode, port, path components

31

"""

32

```

33

34

### WebHDFS Operations

35

36

HTTP-based access to HDFS through WebHDFS REST API.

37

38

```python { .api }

39

def open(http_uri, mode, min_part_size=50*1024**2):

40

"""Open WebHDFS resource via REST API.

41

42

Parameters:

43

http_uri: str - WebHDFS HTTP endpoint URI

44

mode: str - File mode ('rb', 'wb', 'r', 'w')

45

min_part_size: int - Minimum part size for chunked operations

46

47

Returns:

48

File-like object for WebHDFS operations

49

50

Notes:

51

Uses HTTP requests to WebHDFS REST API

52

Supports authentication via transport_params

53

"""

54

55

def parse_uri(uri_as_string):

56

"""Parse WebHDFS URI into components.

57

58

Returns:

59

dict with scheme, host, port, path, namenode components

60

"""

61

```

62

63

## Usage Examples

64

65

### HDFS Examples

66

67

```python

68

from smart_open import open

69

70

# Read from HDFS using default namenode

71

with open('hdfs:///user/data/input.txt', 'rb') as f:

72

content = f.read()

73

74

# Read from specific namenode

75

with open('hdfs://namenode.example.com:9000/user/data/file.txt', 'rb') as f:

76

data = f.read()

77

78

# Write to HDFS

79

with open('hdfs:///user/output/results.txt', 'w') as f:

80

f.write('Processing results')

81

82

# Binary operations

83

with open('hdfs:///user/data/binary-file.dat', 'rb') as f:

84

binary_data = f.read()

85

86

# Text mode with encoding

87

with open('hdfs:///user/logs/application.log', 'r', encoding='utf-8') as f:

88

for line in f:

89

if 'ERROR' in line:

90

print(line.strip())

91

92

# ViewFS (federated HDFS)

93

with open('viewfs:///user/data/federated-file.txt', 'rb') as f:

94

content = f.read()

95

```

96

97

### WebHDFS Examples

98

99

```python

100

# Basic WebHDFS read

101

with open('webhdfs://namenode.example.com:50070/user/data/file.txt', 'rb') as f:

102

content = f.read()

103

104

# WebHDFS write

105

with open('webhdfs://namenode.example.com:50070/user/output/data.txt', 'w') as f:

106

f.write('WebHDFS content')

107

108

# With authentication (Kerberos, delegation tokens, etc.)

109

transport_params = {

110

'user': 'hadoop-user',

111

'delegation_token': 'token-string'

112

}

113

with open('webhdfs://namenode:50070/user/data/secure-file.txt', 'rb',

114

transport_params=transport_params) as f:

115

content = f.read()

116

117

# Custom WebHDFS parameters

118

transport_params = {

119

'replication': 3,

120

'blocksize': 134217728, # 128MB

121

'permission': '755'

122

}

123

with open('webhdfs://namenode:50070/user/output/large-file.dat', 'wb',

124

transport_params=transport_params) as f:

125

f.write(large_data)

126

127

# Direct WebHDFS module usage

128

from smart_open.webhdfs import open as webhdfs_open

129

130

with webhdfs_open('http://namenode:50070/webhdfs/v1/user/data/file.txt', 'rb') as f:

131

content = f.read()

132

```

133

134

## Configuration and Setup

135

136

### HDFS CLI Setup

137

138

```bash

139

# Ensure Hadoop CLI tools are installed and configured

140

export HADOOP_HOME=/opt/hadoop

141

export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin

142

143

# Test HDFS connectivity

144

hdfs dfs -ls /

145

146

# Set HDFS configuration (core-site.xml, hdfs-site.xml)

147

export HADOOP_CONF_DIR=/opt/hadoop/etc/hadoop

148

```

149

150

### WebHDFS Configuration

151

152

```python

153

# WebHDFS endpoint configuration

154

WEBHDFS_BASE_URL = 'http://namenode.example.com:50070'

155

156

# Authentication configuration

157

transport_params = {

158

'user': 'your-username',

159

'timeout': 30,

160

'headers': {

161

'User-Agent': 'smart-open-client/1.0'

162

}

163

}

164

165

# Kerberos authentication (if enabled)

166

transport_params = {

167

'auth': 'kerberos', # Requires requests-kerberos

168

'principal': 'user@REALM.COM'

169

}

170

171

# Custom SSL configuration for secure WebHDFS

172

transport_params = {

173

'verify': '/path/to/ca-cert.pem',

174

'cert': ('/path/to/client-cert.pem', '/path/to/client-key.pem')

175

}

176

```

177

178

## Integration with Big Data Frameworks

179

180

### Apache Spark Integration

181

182

```python

183

# Reading HDFS data in PySpark application

184

from pyspark.sql import SparkSession

185

from smart_open import open

186

187

spark = SparkSession.builder.appName("HDFSReader").getOrCreate()

188

189

# Read metadata or small config files

190

with open('hdfs:///user/config/spark-config.json') as f:

191

config = json.load(f)

192

193

# Process large datasets with Spark

194

df = spark.read.parquet('hdfs:///user/data/large-dataset.parquet')

195

196

# Write results back to HDFS

197

with open('hdfs:///user/output/summary.txt', 'w') as f:

198

f.write(f"Processed {df.count()} records")

199

```

200

201

### MapReduce Integration

202

203

```python

204

# Hadoop Streaming job with smart-open

205

import sys

206

from smart_open import open

207

208

# Read input from HDFS

209

for line in sys.stdin:

210

# Process line

211

result = process_line(line.strip())

212

213

# Write intermediate results to HDFS

214

with open('hdfs:///user/temp/intermediate-results.txt', 'a') as f:

215

f.write(f"{result}\n")

216

```

217

218

### Apache Kafka Integration

219

220

```python

221

# Read HDFS data for Kafka producers

222

from kafka import KafkaProducer

223

from smart_open import open

224

225

producer = KafkaProducer(bootstrap_servers=['localhost:9092'])

226

227

# Stream data from HDFS to Kafka

228

with open('hdfs:///user/streaming/data.jsonl', 'rb') as f:

229

for line in f:

230

producer.send('data-topic', line)

231

232

producer.flush()

233

producer.close()

234

```

235

236

## Performance Optimization

237

238

### HDFS Performance Tips

239

240

```python

241

# Use binary mode for better performance

242

with open('hdfs:///user/data/large-file.dat', 'rb') as f:

243

# Process in chunks

244

while True:

245

chunk = f.read(1024 * 1024) # 1MB chunks

246

if not chunk:

247

break

248

process_chunk(chunk)

249

250

# Parallel processing with multiple HDFS files

251

import concurrent.futures

252

import glob

253

254

def process_hdfs_file(filepath):

255

with open(f'hdfs://{filepath}', 'rb') as f:

256

return process_data(f.read())

257

258

# Process multiple files in parallel

259

hdfs_files = glob.glob('/user/data/part-*')

260

with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:

261

results = list(executor.map(process_hdfs_file, hdfs_files))

262

```

263

264

### WebHDFS Performance Tips

265

266

```python

267

# Connection pooling for WebHDFS

268

import requests

269

session = requests.Session()

270

adapter = requests.adapters.HTTPAdapter(

271

pool_connections=10,

272

pool_maxsize=20

273

)

274

session.mount('http://', adapter)

275

session.mount('https://', adapter)

276

277

transport_params = {'session': session}

278

279

# Batch operations

280

files_to_process = [

281

'webhdfs://namenode:50070/user/data/file1.txt',

282

'webhdfs://namenode:50070/user/data/file2.txt',

283

'webhdfs://namenode:50070/user/data/file3.txt'

284

]

285

286

for file_uri in files_to_process:

287

with open(file_uri, 'rb', transport_params=transport_params) as f:

288

process_file(f.read())

289

290

# Chunked uploads for large files

291

transport_params = {

292

'min_part_size': 64 * 1024 * 1024, # 64MB chunks

293

'timeout': 300

294

}

295

with open('webhdfs://namenode:50070/user/output/huge-file.dat', 'wb',

296

transport_params=transport_params) as f:

297

for chunk in generate_large_data():

298

f.write(chunk)

299

```

300

301

## Error Handling and Reliability

302

303

### HDFS Error Handling

304

305

```python

306

import subprocess

307

from smart_open import open

308

309

try:

310

with open('hdfs:///user/data/file.txt', 'rb') as f:

311

content = f.read()

312

except subprocess.CalledProcessError as e:

313

if 'No such file or directory' in str(e):

314

print("HDFS file not found")

315

elif 'Permission denied' in str(e):

316

print("HDFS permission denied")

317

else:

318

print(f"HDFS command failed: {e}")

319

except FileNotFoundError:

320

print("Hadoop CLI tools not found - ensure HADOOP_HOME is set")

321

```

322

323

### WebHDFS Error Handling

324

325

```python

326

import requests

327

from smart_open import open

328

329

try:

330

with open('webhdfs://namenode:50070/user/data/file.txt', 'rb') as f:

331

content = f.read()

332

except requests.exceptions.HTTPError as e:

333

status_code = e.response.status_code

334

if status_code == 404:

335

print("WebHDFS file not found")

336

elif status_code == 403:

337

print("WebHDFS access forbidden")

338

elif status_code == 401:

339

print("WebHDFS authentication required")

340

else:

341

print(f"WebHDFS HTTP error: {status_code}")

342

except requests.exceptions.ConnectionError:

343

print("WebHDFS connection failed - check namenode availability")

344

except requests.exceptions.Timeout:

345

print("WebHDFS request timed out")

346

```

347

348

## Security and Authentication

349

350

### HDFS Security

351

352

```bash

353

# Kerberos authentication for HDFS

354

kinit user@REALM.COM

355

356

# Check current Kerberos ticket

357

klist

358

359

# Set Hadoop security configuration

360

export HADOOP_SECURITY_AUTHENTICATION=kerberos

361

export HADOOP_SECURITY_AUTHORIZATION=true

362

```

363

364

### WebHDFS Security

365

366

```python

367

# Simple authentication

368

transport_params = {

369

'user': 'hadoop-user'

370

}

371

372

# Delegation token authentication

373

transport_params = {

374

'delegation': 'delegation-token-string'

375

}

376

377

# HTTPS WebHDFS with client certificates

378

transport_params = {

379

'cert': '/path/to/client-cert.pem',

380

'verify': '/path/to/ca-cert.pem'

381

}

382

383

# Custom authentication headers

384

transport_params = {

385

'headers': {

386

'Authorization': 'Bearer jwt-token-here'

387

}

388

}

389

```

390

391

## Monitoring and Debugging

392

393

### HDFS Debugging

394

395

```python

396

import logging

397

from smart_open import open

398

399

# Enable debug logging

400

logging.basicConfig(level=logging.DEBUG)

401

402

# Check HDFS file status before opening

403

import subprocess

404

try:

405

result = subprocess.run(['hdfs', 'dfs', '-stat', '%s', '/user/data/file.txt'],

406

capture_output=True, text=True, check=True)

407

file_size = int(result.stdout.strip())

408

print(f"HDFS file size: {file_size} bytes")

409

except subprocess.CalledProcessError:

410

print("HDFS file does not exist")

411

```

412

413

### WebHDFS Debugging

414

415

```python

416

# Enable HTTP request debugging

417

import requests

418

import logging

419

420

# Enable debug logging for requests

421

logging.basicConfig(level=logging.DEBUG)

422

requests_log = logging.getLogger("requests.packages.urllib3")

423

requests_log.setLevel(logging.DEBUG)

424

requests_log.propagate = True

425

426

# Check WebHDFS file status

427

transport_params = {'timeout': 10}

428

try:

429

response = requests.get(

430

'http://namenode:50070/webhdfs/v1/user/data/file.txt?op=GETFILESTATUS',

431

**transport_params

432

)

433

file_info = response.json()

434

print(f"WebHDFS file info: {file_info}")

435

except Exception as e:

436

print(f"WebHDFS status check failed: {e}")

437

```