0
# Big Data and Distributed Systems
1
2
Integration with Hadoop ecosystem (HDFS, WebHDFS) for big data processing workflows. Smart-open provides seamless access to distributed file systems commonly used in big data and analytics environments.
3
4
## Capabilities
5
6
### HDFS Operations
7
8
Direct access to Hadoop Distributed File System through command-line interface.
9
10
```python { .api }
11
def open(uri, mode):
12
"""Open HDFS resource via CLI tools.
13
14
Parameters:
15
uri: str - HDFS URI (hdfs://namenode:port/path or hdfs:///path)
16
mode: str - File mode ('rb', 'wb', 'r', 'w')
17
18
Returns:
19
File-like object for HDFS operations
20
21
Notes:
22
Requires hadoop CLI tools to be installed and accessible
23
Uses subprocess calls to hdfs dfs commands
24
"""
25
26
def parse_uri(uri_as_string):
27
"""Parse HDFS URI into components.
28
29
Returns:
30
dict with scheme, namenode, port, path components
31
"""
32
```
33
34
### WebHDFS Operations
35
36
HTTP-based access to HDFS through WebHDFS REST API.
37
38
```python { .api }
39
def open(http_uri, mode, min_part_size=50*1024**2):
40
"""Open WebHDFS resource via REST API.
41
42
Parameters:
43
http_uri: str - WebHDFS HTTP endpoint URI
44
mode: str - File mode ('rb', 'wb', 'r', 'w')
45
min_part_size: int - Minimum part size for chunked operations
46
47
Returns:
48
File-like object for WebHDFS operations
49
50
Notes:
51
Uses HTTP requests to WebHDFS REST API
52
Supports authentication via transport_params
53
"""
54
55
def parse_uri(uri_as_string):
56
"""Parse WebHDFS URI into components.
57
58
Returns:
59
dict with scheme, host, port, path, namenode components
60
"""
61
```
62
63
## Usage Examples
64
65
### HDFS Examples
66
67
```python
68
from smart_open import open
69
70
# Read from HDFS using default namenode
71
with open('hdfs:///user/data/input.txt', 'rb') as f:
72
content = f.read()
73
74
# Read from specific namenode
75
with open('hdfs://namenode.example.com:9000/user/data/file.txt', 'rb') as f:
76
data = f.read()
77
78
# Write to HDFS
79
with open('hdfs:///user/output/results.txt', 'w') as f:
80
f.write('Processing results')
81
82
# Binary operations
83
with open('hdfs:///user/data/binary-file.dat', 'rb') as f:
84
binary_data = f.read()
85
86
# Text mode with encoding
87
with open('hdfs:///user/logs/application.log', 'r', encoding='utf-8') as f:
88
for line in f:
89
if 'ERROR' in line:
90
print(line.strip())
91
92
# ViewFS (federated HDFS)
93
with open('viewfs:///user/data/federated-file.txt', 'rb') as f:
94
content = f.read()
95
```
96
97
### WebHDFS Examples
98
99
```python
100
# Basic WebHDFS read
101
with open('webhdfs://namenode.example.com:50070/user/data/file.txt', 'rb') as f:
102
content = f.read()
103
104
# WebHDFS write
105
with open('webhdfs://namenode.example.com:50070/user/output/data.txt', 'w') as f:
106
f.write('WebHDFS content')
107
108
# With authentication (Kerberos, delegation tokens, etc.)
109
transport_params = {
110
'user': 'hadoop-user',
111
'delegation_token': 'token-string'
112
}
113
with open('webhdfs://namenode:50070/user/data/secure-file.txt', 'rb',
114
transport_params=transport_params) as f:
115
content = f.read()
116
117
# Custom WebHDFS parameters
118
transport_params = {
119
'replication': 3,
120
'blocksize': 134217728, # 128MB
121
'permission': '755'
122
}
123
with open('webhdfs://namenode:50070/user/output/large-file.dat', 'wb',
124
transport_params=transport_params) as f:
125
f.write(large_data)
126
127
# Direct WebHDFS module usage
128
from smart_open.webhdfs import open as webhdfs_open
129
130
with webhdfs_open('http://namenode:50070/webhdfs/v1/user/data/file.txt', 'rb') as f:
131
content = f.read()
132
```
133
134
## Configuration and Setup
135
136
### HDFS CLI Setup
137
138
```bash
139
# Ensure Hadoop CLI tools are installed and configured
140
export HADOOP_HOME=/opt/hadoop
141
export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
142
143
# Test HDFS connectivity
144
hdfs dfs -ls /
145
146
# Set HDFS configuration (core-site.xml, hdfs-site.xml)
147
export HADOOP_CONF_DIR=/opt/hadoop/etc/hadoop
148
```
149
150
### WebHDFS Configuration
151
152
```python
153
# WebHDFS endpoint configuration
154
WEBHDFS_BASE_URL = 'http://namenode.example.com:50070'
155
156
# Authentication configuration
157
transport_params = {
158
'user': 'your-username',
159
'timeout': 30,
160
'headers': {
161
'User-Agent': 'smart-open-client/1.0'
162
}
163
}
164
165
# Kerberos authentication (if enabled)
166
transport_params = {
167
'auth': 'kerberos', # Requires requests-kerberos
168
'principal': 'user@REALM.COM'
169
}
170
171
# Custom SSL configuration for secure WebHDFS
172
transport_params = {
173
'verify': '/path/to/ca-cert.pem',
174
'cert': ('/path/to/client-cert.pem', '/path/to/client-key.pem')
175
}
176
```
177
178
## Integration with Big Data Frameworks
179
180
### Apache Spark Integration
181
182
```python
183
# Reading HDFS data in PySpark application
184
from pyspark.sql import SparkSession
185
from smart_open import open
186
187
spark = SparkSession.builder.appName("HDFSReader").getOrCreate()
188
189
# Read metadata or small config files
190
with open('hdfs:///user/config/spark-config.json') as f:
191
config = json.load(f)
192
193
# Process large datasets with Spark
194
df = spark.read.parquet('hdfs:///user/data/large-dataset.parquet')
195
196
# Write results back to HDFS
197
with open('hdfs:///user/output/summary.txt', 'w') as f:
198
f.write(f"Processed {df.count()} records")
199
```
200
201
### MapReduce Integration
202
203
```python
204
# Hadoop Streaming job with smart-open
205
import sys
206
from smart_open import open
207
208
# Read input from HDFS
209
for line in sys.stdin:
210
# Process line
211
result = process_line(line.strip())
212
213
# Write intermediate results to HDFS
214
with open('hdfs:///user/temp/intermediate-results.txt', 'a') as f:
215
f.write(f"{result}\n")
216
```
217
218
### Apache Kafka Integration
219
220
```python
221
# Read HDFS data for Kafka producers
222
from kafka import KafkaProducer
223
from smart_open import open
224
225
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
226
227
# Stream data from HDFS to Kafka
228
with open('hdfs:///user/streaming/data.jsonl', 'rb') as f:
229
for line in f:
230
producer.send('data-topic', line)
231
232
producer.flush()
233
producer.close()
234
```
235
236
## Performance Optimization
237
238
### HDFS Performance Tips
239
240
```python
241
# Use binary mode for better performance
242
with open('hdfs:///user/data/large-file.dat', 'rb') as f:
243
# Process in chunks
244
while True:
245
chunk = f.read(1024 * 1024) # 1MB chunks
246
if not chunk:
247
break
248
process_chunk(chunk)
249
250
# Parallel processing with multiple HDFS files
251
import concurrent.futures
252
import glob
253
254
def process_hdfs_file(filepath):
255
with open(f'hdfs://{filepath}', 'rb') as f:
256
return process_data(f.read())
257
258
# Process multiple files in parallel
259
hdfs_files = glob.glob('/user/data/part-*')
260
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
261
results = list(executor.map(process_hdfs_file, hdfs_files))
262
```
263
264
### WebHDFS Performance Tips
265
266
```python
267
# Connection pooling for WebHDFS
268
import requests
269
session = requests.Session()
270
adapter = requests.adapters.HTTPAdapter(
271
pool_connections=10,
272
pool_maxsize=20
273
)
274
session.mount('http://', adapter)
275
session.mount('https://', adapter)
276
277
transport_params = {'session': session}
278
279
# Batch operations
280
files_to_process = [
281
'webhdfs://namenode:50070/user/data/file1.txt',
282
'webhdfs://namenode:50070/user/data/file2.txt',
283
'webhdfs://namenode:50070/user/data/file3.txt'
284
]
285
286
for file_uri in files_to_process:
287
with open(file_uri, 'rb', transport_params=transport_params) as f:
288
process_file(f.read())
289
290
# Chunked uploads for large files
291
transport_params = {
292
'min_part_size': 64 * 1024 * 1024, # 64MB chunks
293
'timeout': 300
294
}
295
with open('webhdfs://namenode:50070/user/output/huge-file.dat', 'wb',
296
transport_params=transport_params) as f:
297
for chunk in generate_large_data():
298
f.write(chunk)
299
```
300
301
## Error Handling and Reliability
302
303
### HDFS Error Handling
304
305
```python
306
import subprocess
307
from smart_open import open
308
309
try:
310
with open('hdfs:///user/data/file.txt', 'rb') as f:
311
content = f.read()
312
except subprocess.CalledProcessError as e:
313
if 'No such file or directory' in str(e):
314
print("HDFS file not found")
315
elif 'Permission denied' in str(e):
316
print("HDFS permission denied")
317
else:
318
print(f"HDFS command failed: {e}")
319
except FileNotFoundError:
320
print("Hadoop CLI tools not found - ensure HADOOP_HOME is set")
321
```
322
323
### WebHDFS Error Handling
324
325
```python
326
import requests
327
from smart_open import open
328
329
try:
330
with open('webhdfs://namenode:50070/user/data/file.txt', 'rb') as f:
331
content = f.read()
332
except requests.exceptions.HTTPError as e:
333
status_code = e.response.status_code
334
if status_code == 404:
335
print("WebHDFS file not found")
336
elif status_code == 403:
337
print("WebHDFS access forbidden")
338
elif status_code == 401:
339
print("WebHDFS authentication required")
340
else:
341
print(f"WebHDFS HTTP error: {status_code}")
342
except requests.exceptions.ConnectionError:
343
print("WebHDFS connection failed - check namenode availability")
344
except requests.exceptions.Timeout:
345
print("WebHDFS request timed out")
346
```
347
348
## Security and Authentication
349
350
### HDFS Security
351
352
```bash
353
# Kerberos authentication for HDFS
354
kinit user@REALM.COM
355
356
# Check current Kerberos ticket
357
klist
358
359
# Set Hadoop security configuration
360
export HADOOP_SECURITY_AUTHENTICATION=kerberos
361
export HADOOP_SECURITY_AUTHORIZATION=true
362
```
363
364
### WebHDFS Security
365
366
```python
367
# Simple authentication
368
transport_params = {
369
'user': 'hadoop-user'
370
}
371
372
# Delegation token authentication
373
transport_params = {
374
'delegation': 'delegation-token-string'
375
}
376
377
# HTTPS WebHDFS with client certificates
378
transport_params = {
379
'cert': '/path/to/client-cert.pem',
380
'verify': '/path/to/ca-cert.pem'
381
}
382
383
# Custom authentication headers
384
transport_params = {
385
'headers': {
386
'Authorization': 'Bearer jwt-token-here'
387
}
388
}
389
```
390
391
## Monitoring and Debugging
392
393
### HDFS Debugging
394
395
```python
396
import logging
397
from smart_open import open
398
399
# Enable debug logging
400
logging.basicConfig(level=logging.DEBUG)
401
402
# Check HDFS file status before opening
403
import subprocess
404
try:
405
result = subprocess.run(['hdfs', 'dfs', '-stat', '%s', '/user/data/file.txt'],
406
capture_output=True, text=True, check=True)
407
file_size = int(result.stdout.strip())
408
print(f"HDFS file size: {file_size} bytes")
409
except subprocess.CalledProcessError:
410
print("HDFS file does not exist")
411
```
412
413
### WebHDFS Debugging
414
415
```python
416
# Enable HTTP request debugging
417
import requests
418
import logging
419
420
# Enable debug logging for requests
421
logging.basicConfig(level=logging.DEBUG)
422
requests_log = logging.getLogger("requests.packages.urllib3")
423
requests_log.setLevel(logging.DEBUG)
424
requests_log.propagate = True
425
426
# Check WebHDFS file status
427
transport_params = {'timeout': 10}
428
try:
429
response = requests.get(
430
'http://namenode:50070/webhdfs/v1/user/data/file.txt?op=GETFILESTATUS',
431
**transport_params
432
)
433
file_info = response.json()
434
print(f"WebHDFS file info: {file_info}")
435
except Exception as e:
436
print(f"WebHDFS status check failed: {e}")
437
```