Utils for streaming large files from S3, HDFS, GCS, SFTP, Azure Blob Storage, and local filesystem with transparent compression support
—
Automatic and explicit compression handling for multiple formats with streaming support. Smart-open provides transparent compression/decompression that works seamlessly across all supported storage systems.
Register custom compression handlers and manage compression behavior.
def register_compressor(ext, callback):
"""Register compression handler for file extension.
Parameters:
ext: str - File extension with leading dot (e.g., '.gz', '.custom')
callback: callable - Function accepting (file_obj, mode) returning wrapped file object
Notes:
Callback should return file-like object that handles compression/decompression
Mode parameter indicates 'rb'/'wb' for binary or text mode intent
"""
def get_supported_compression_types():
"""Get list of supported compression type identifiers.
Returns:
list[str] - Compression types like ['gzip', 'bz2', 'zstandard', 'disable', 'infer_from_extension']
"""
def get_supported_extensions():
"""Get list of supported compressed file extensions.
Returns:
list[str] - File extensions like ['.gz', '.bz2', '.zst', '.xz', '.lzma']
"""
def compression_wrapper(file_obj, mode, compression='infer_from_extension', filename=None):
"""Wrap file object with appropriate compression handler.
Parameters:
file_obj: file-like object - Base file object to wrap
mode: str - File mode for compression behavior
compression: str - Compression type or 'infer_from_extension'
filename: str - Filename for extension-based inference
Returns:
file-like object - Wrapped or original file object
"""# Compression behavior constants
NO_COMPRESSION = 'disable'
INFER_FROM_EXTENSION = 'infer_from_extension'Smart-open supports multiple compression formats out of the box:
.gz) - Most common, good compression ratio and speed.bz2) - Higher compression ratio, slower.zst) - Modern format, excellent compression and speed.xz, .lzma) - High compression ratio.lz4) - Very fast compression/decompressionfrom smart_open import open
# Compression automatically detected from file extension
with open('s3://bucket/data.txt.gz') as f:
uncompressed_text = f.read() # Automatically decompressed
with open('gs://bucket/logs.txt.bz2') as f:
for line in f: # Line-by-line decompression
process_log_line(line)
# Writing compressed files (automatic compression)
with open('s3://bucket/output.txt.gz', 'w') as f:
f.write('This will be compressed with gzip')
with open('azure://container/data.json.zst', 'w') as f:
json.dump(large_data, f) # Compressed with zstandard# Explicitly specify compression type
with open('s3://bucket/data.txt', compression='gzip') as f:
content = f.read()
# Disable compression for files with compression extensions
with open('s3://bucket/already-compressed.gz', compression='disable') as f:
raw_compressed_data = f.read() # Read as-is, no decompression
# Force compression on write
with open('gs://bucket/output.txt', 'w', compression='bz2') as f:
f.write('This will be compressed with bzip2')# Binary mode with compression
with open('s3://bucket/binary-data.dat.gz', 'rb') as f:
decompressed_bytes = f.read()
with open('s3://bucket/output.bin.zst', 'wb') as f:
f.write(binary_data) # Compressed binary write
# Text mode with compression and encoding
with open('gs://bucket/unicode-text.txt.gz', encoding='utf-8') as f:
unicode_text = f.read()
with open('azure://container/output.csv.bz2', 'w', encoding='utf-8') as f:
writer = csv.writer(f)
writer.writerows(data)from smart_open import register_compressor
import lz4.frame
# Register LZ4 compression handler
def lz4_handler(file_obj, mode):
if 'r' in mode:
return lz4.frame.open(file_obj, mode='rb')
else:
return lz4.frame.open(file_obj, mode='wb')
register_compressor('.lz4', lz4_handler)
# Now .lz4 files work automatically
with open('s3://bucket/data.txt.lz4') as f:
content = f.read()
# Custom compression with parameters
def custom_gzip_handler(file_obj, mode):
import gzip
if 'r' in mode:
return gzip.open(file_obj, mode='rt', encoding='utf-8')
else:
return gzip.open(file_obj, mode='wt', encoding='utf-8', compresslevel=9)
register_compressor('.custom.gz', custom_gzip_handler)# For maximum compression (slower)
with open('s3://bucket/archive.txt.bz2', 'w') as f:
f.write(large_text_data)
# For fastest compression/decompression
with open('s3://bucket/fast-access.txt.lz4', 'w') as f:
f.write(frequently_accessed_data)
# Good balance of speed and compression
with open('s3://bucket/balanced.txt.zst', 'w') as f:
f.write(general_purpose_data)
# Traditional web standard
with open('s3://bucket/web-compatible.txt.gz', 'w') as f:
f.write(web_data)# Stream large files with compression
with open('s3://bucket/huge-file.txt.gz') as f:
for line in f: # Memory-efficient line-by-line decompression
process_line(line)
# Chunked reading with compression
with open('gs://bucket/large-binary.dat.zst', 'rb') as f:
while True:
chunk = f.read(64 * 1024) # 64KB chunks, decompressed
if not chunk:
break
process_chunk(chunk)
# Streaming write with compression
with open('azure://container/stream-output.txt.gz', 'w') as f:
for record in generate_large_dataset():
f.write(f"{record}\n") # Compressed on-the-fly# Custom compression levels via transport_params
import gzip
def high_compression_gzip(file_obj, mode):
if 'r' in mode:
return gzip.open(file_obj, mode='rt')
else:
return gzip.open(file_obj, mode='wt', compresslevel=9)
register_compressor('.high.gz', high_compression_gzip)
# Or use existing libraries with custom parameters
import bz2
def fast_bzip2(file_obj, mode):
if 'r' in mode:
return bz2.open(file_obj, mode='rt')
else:
return bz2.open(file_obj, mode='wt', compresslevel=1)
register_compressor('.fast.bz2', fast_bzip2)# ETL pipeline with compression
def process_compressed_data():
# Extract: Read compressed source data
with open('s3://raw-data/input.csv.gz') as f:
reader = csv.DictReader(f)
data = list(reader)
# Transform: Process data
processed_data = transform_data(data)
# Load: Write compressed output
with open('s3://processed-data/output.json.zst', 'w') as f:
json.dump(processed_data, f)
# Batch processing with different compression formats
input_files = [
's3://data/file1.txt.gz',
's3://data/file2.txt.bz2',
's3://data/file3.txt.zst'
]
for input_file in input_files:
with open(input_file) as f: # Automatic decompression
content = f.read()
result = process_content(content)
# Output with consistent compression
output_file = input_file.replace('s3://data/', 's3://results/').replace('.txt.', '.result.')
with open(output_file, 'w') as out_f:
out_f.write(result)# Compress backups with maximum compression
import json
from datetime import datetime
backup_data = collect_backup_data()
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
# High compression for long-term storage
with open(f's3://backups/backup_{timestamp}.json.bz2', 'w') as f:
json.dump(backup_data, f, separators=(',', ':'))
# Fast compression for recent backups
with open(f's3://recent-backups/backup_{timestamp}.json.lz4', 'w') as f:
json.dump(backup_data, f, indent=2)# Process compressed log files
import re
from collections import defaultdict
log_pattern = re.compile(r'(\d{4}-\d{2}-\d{2}) (\w+): (.+)')
stats = defaultdict(int)
# Read compressed logs from multiple sources
log_files = [
's3://logs/app.log.gz',
'gs://logs/app.log.bz2',
'azure://logs/app.log.zst'
]
for log_file in log_files:
with open(log_file) as f:
for line in f:
match = log_pattern.match(line.strip())
if match:
date, level, message = match.groups()
stats[level] += 1
# Write compressed summary
with open('s3://reports/log-summary.json.gz', 'w') as f:
json.dump(dict(stats), f)import gzip
import bz2
import lzma
from smart_open import open
try:
with open('s3://bucket/corrupted-file.txt.gz') as f:
content = f.read()
except gzip.BadGzipFile:
print("Corrupted gzip file")
except bz2.BadBz2File:
print("Corrupted bzip2 file")
except lzma.LZMAError:
print("Corrupted LZMA/XZ file")
except Exception as e:
print(f"Other compression error: {e}")
# Fallback to uncompressed reading
try:
with open('s3://bucket/maybe-compressed.txt') as f:
content = f.read()
except Exception:
# Try without decompression
with open('s3://bucket/maybe-compressed.txt', compression='disable') as f:
raw_content = f.read()import hashlib
def verify_compressed_file(uri, expected_hash):
"""Verify integrity of compressed file content."""
hasher = hashlib.sha256()
try:
with open(uri, 'rb') as f:
for chunk in iter(lambda: f.read(8192), b''):
hasher.update(chunk)
actual_hash = hasher.hexdigest()
return actual_hash == expected_hash
except Exception as e:
print(f"Verification failed: {e}")
return False
# Usage
if verify_compressed_file('s3://bucket/data.txt.gz', expected_hash):
print("File integrity verified")
else:
print("File integrity check failed")# Pre-compile compression handlers for repeated use
import gzip
import io
class OptimizedGzipHandler:
def __init__(self, compresslevel=6):
self.compresslevel = compresslevel
def __call__(self, file_obj, mode):
if 'r' in mode:
return gzip.open(file_obj, mode='rt')
else:
return gzip.open(file_obj, mode='wt',
compresslevel=self.compresslevel)
# Register optimized handler
register_compressor('.opt.gz', OptimizedGzipHandler(compresslevel=9))
# Batch processing with consistent compression settings
files_to_process = ['file1.txt', 'file2.txt', 'file3.txt']
for filename in files_to_process:
with open(f's3://bucket/{filename}.opt.gz', 'w') as f:
f.write(process_file(filename))Install with Tessl CLI
npx tessl i tessl/pypi-smart-open