Utils for streaming large files from S3, HDFS, GCS, SFTP, Azure Blob Storage, and local filesystem with transparent compression support
—
Integration with Hadoop ecosystem (HDFS, WebHDFS) for big data processing workflows. Smart-open provides seamless access to distributed file systems commonly used in big data and analytics environments.
Direct access to Hadoop Distributed File System through command-line interface.
def open(uri, mode):
"""Open HDFS resource via CLI tools.
Parameters:
uri: str - HDFS URI (hdfs://namenode:port/path or hdfs:///path)
mode: str - File mode ('rb', 'wb', 'r', 'w')
Returns:
File-like object for HDFS operations
Notes:
Requires hadoop CLI tools to be installed and accessible
Uses subprocess calls to hdfs dfs commands
"""
def parse_uri(uri_as_string):
"""Parse HDFS URI into components.
Returns:
dict with scheme, namenode, port, path components
"""HTTP-based access to HDFS through WebHDFS REST API.
def open(http_uri, mode, min_part_size=50*1024**2):
"""Open WebHDFS resource via REST API.
Parameters:
http_uri: str - WebHDFS HTTP endpoint URI
mode: str - File mode ('rb', 'wb', 'r', 'w')
min_part_size: int - Minimum part size for chunked operations
Returns:
File-like object for WebHDFS operations
Notes:
Uses HTTP requests to WebHDFS REST API
Supports authentication via transport_params
"""
def parse_uri(uri_as_string):
"""Parse WebHDFS URI into components.
Returns:
dict with scheme, host, port, path, namenode components
"""from smart_open import open
# Read from HDFS using default namenode
with open('hdfs:///user/data/input.txt', 'rb') as f:
content = f.read()
# Read from specific namenode
with open('hdfs://namenode.example.com:9000/user/data/file.txt', 'rb') as f:
data = f.read()
# Write to HDFS
with open('hdfs:///user/output/results.txt', 'w') as f:
f.write('Processing results')
# Binary operations
with open('hdfs:///user/data/binary-file.dat', 'rb') as f:
binary_data = f.read()
# Text mode with encoding
with open('hdfs:///user/logs/application.log', 'r', encoding='utf-8') as f:
for line in f:
if 'ERROR' in line:
print(line.strip())
# ViewFS (federated HDFS)
with open('viewfs:///user/data/federated-file.txt', 'rb') as f:
content = f.read()# Basic WebHDFS read
with open('webhdfs://namenode.example.com:50070/user/data/file.txt', 'rb') as f:
content = f.read()
# WebHDFS write
with open('webhdfs://namenode.example.com:50070/user/output/data.txt', 'w') as f:
f.write('WebHDFS content')
# With authentication (Kerberos, delegation tokens, etc.)
transport_params = {
'user': 'hadoop-user',
'delegation_token': 'token-string'
}
with open('webhdfs://namenode:50070/user/data/secure-file.txt', 'rb',
transport_params=transport_params) as f:
content = f.read()
# Custom WebHDFS parameters
transport_params = {
'replication': 3,
'blocksize': 134217728, # 128MB
'permission': '755'
}
with open('webhdfs://namenode:50070/user/output/large-file.dat', 'wb',
transport_params=transport_params) as f:
f.write(large_data)
# Direct WebHDFS module usage
from smart_open.webhdfs import open as webhdfs_open
with webhdfs_open('http://namenode:50070/webhdfs/v1/user/data/file.txt', 'rb') as f:
content = f.read()# Ensure Hadoop CLI tools are installed and configured
export HADOOP_HOME=/opt/hadoop
export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
# Test HDFS connectivity
hdfs dfs -ls /
# Set HDFS configuration (core-site.xml, hdfs-site.xml)
export HADOOP_CONF_DIR=/opt/hadoop/etc/hadoop# WebHDFS endpoint configuration
WEBHDFS_BASE_URL = 'http://namenode.example.com:50070'
# Authentication configuration
transport_params = {
'user': 'your-username',
'timeout': 30,
'headers': {
'User-Agent': 'smart-open-client/1.0'
}
}
# Kerberos authentication (if enabled)
transport_params = {
'auth': 'kerberos', # Requires requests-kerberos
'principal': 'user@REALM.COM'
}
# Custom SSL configuration for secure WebHDFS
transport_params = {
'verify': '/path/to/ca-cert.pem',
'cert': ('/path/to/client-cert.pem', '/path/to/client-key.pem')
}# Reading HDFS data in PySpark application
from pyspark.sql import SparkSession
from smart_open import open
spark = SparkSession.builder.appName("HDFSReader").getOrCreate()
# Read metadata or small config files
with open('hdfs:///user/config/spark-config.json') as f:
config = json.load(f)
# Process large datasets with Spark
df = spark.read.parquet('hdfs:///user/data/large-dataset.parquet')
# Write results back to HDFS
with open('hdfs:///user/output/summary.txt', 'w') as f:
f.write(f"Processed {df.count()} records")# Hadoop Streaming job with smart-open
import sys
from smart_open import open
# Read input from HDFS
for line in sys.stdin:
# Process line
result = process_line(line.strip())
# Write intermediate results to HDFS
with open('hdfs:///user/temp/intermediate-results.txt', 'a') as f:
f.write(f"{result}\n")# Read HDFS data for Kafka producers
from kafka import KafkaProducer
from smart_open import open
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
# Stream data from HDFS to Kafka
with open('hdfs:///user/streaming/data.jsonl', 'rb') as f:
for line in f:
producer.send('data-topic', line)
producer.flush()
producer.close()# Use binary mode for better performance
with open('hdfs:///user/data/large-file.dat', 'rb') as f:
# Process in chunks
while True:
chunk = f.read(1024 * 1024) # 1MB chunks
if not chunk:
break
process_chunk(chunk)
# Parallel processing with multiple HDFS files
import concurrent.futures
import glob
def process_hdfs_file(filepath):
with open(f'hdfs://{filepath}', 'rb') as f:
return process_data(f.read())
# Process multiple files in parallel
hdfs_files = glob.glob('/user/data/part-*')
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
results = list(executor.map(process_hdfs_file, hdfs_files))# Connection pooling for WebHDFS
import requests
session = requests.Session()
adapter = requests.adapters.HTTPAdapter(
pool_connections=10,
pool_maxsize=20
)
session.mount('http://', adapter)
session.mount('https://', adapter)
transport_params = {'session': session}
# Batch operations
files_to_process = [
'webhdfs://namenode:50070/user/data/file1.txt',
'webhdfs://namenode:50070/user/data/file2.txt',
'webhdfs://namenode:50070/user/data/file3.txt'
]
for file_uri in files_to_process:
with open(file_uri, 'rb', transport_params=transport_params) as f:
process_file(f.read())
# Chunked uploads for large files
transport_params = {
'min_part_size': 64 * 1024 * 1024, # 64MB chunks
'timeout': 300
}
with open('webhdfs://namenode:50070/user/output/huge-file.dat', 'wb',
transport_params=transport_params) as f:
for chunk in generate_large_data():
f.write(chunk)import subprocess
from smart_open import open
try:
with open('hdfs:///user/data/file.txt', 'rb') as f:
content = f.read()
except subprocess.CalledProcessError as e:
if 'No such file or directory' in str(e):
print("HDFS file not found")
elif 'Permission denied' in str(e):
print("HDFS permission denied")
else:
print(f"HDFS command failed: {e}")
except FileNotFoundError:
print("Hadoop CLI tools not found - ensure HADOOP_HOME is set")import requests
from smart_open import open
try:
with open('webhdfs://namenode:50070/user/data/file.txt', 'rb') as f:
content = f.read()
except requests.exceptions.HTTPError as e:
status_code = e.response.status_code
if status_code == 404:
print("WebHDFS file not found")
elif status_code == 403:
print("WebHDFS access forbidden")
elif status_code == 401:
print("WebHDFS authentication required")
else:
print(f"WebHDFS HTTP error: {status_code}")
except requests.exceptions.ConnectionError:
print("WebHDFS connection failed - check namenode availability")
except requests.exceptions.Timeout:
print("WebHDFS request timed out")# Kerberos authentication for HDFS
kinit user@REALM.COM
# Check current Kerberos ticket
klist
# Set Hadoop security configuration
export HADOOP_SECURITY_AUTHENTICATION=kerberos
export HADOOP_SECURITY_AUTHORIZATION=true# Simple authentication
transport_params = {
'user': 'hadoop-user'
}
# Delegation token authentication
transport_params = {
'delegation': 'delegation-token-string'
}
# HTTPS WebHDFS with client certificates
transport_params = {
'cert': '/path/to/client-cert.pem',
'verify': '/path/to/ca-cert.pem'
}
# Custom authentication headers
transport_params = {
'headers': {
'Authorization': 'Bearer jwt-token-here'
}
}import logging
from smart_open import open
# Enable debug logging
logging.basicConfig(level=logging.DEBUG)
# Check HDFS file status before opening
import subprocess
try:
result = subprocess.run(['hdfs', 'dfs', '-stat', '%s', '/user/data/file.txt'],
capture_output=True, text=True, check=True)
file_size = int(result.stdout.strip())
print(f"HDFS file size: {file_size} bytes")
except subprocess.CalledProcessError:
print("HDFS file does not exist")# Enable HTTP request debugging
import requests
import logging
# Enable debug logging for requests
logging.basicConfig(level=logging.DEBUG)
requests_log = logging.getLogger("requests.packages.urllib3")
requests_log.setLevel(logging.DEBUG)
requests_log.propagate = True
# Check WebHDFS file status
transport_params = {'timeout': 10}
try:
response = requests.get(
'http://namenode:50070/webhdfs/v1/user/data/file.txt?op=GETFILESTATUS',
**transport_params
)
file_info = response.json()
print(f"WebHDFS file info: {file_info}")
except Exception as e:
print(f"WebHDFS status check failed: {e}")Install with Tessl CLI
npx tessl i tessl/pypi-smart-open