CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-smart-open

Utils for streaming large files from S3, HDFS, GCS, SFTP, Azure Blob Storage, and local filesystem with transparent compression support

Pending
Overview
Eval results
Files

compression.mddocs/

Compression and Encoding

Automatic and explicit compression handling for multiple formats with streaming support. Smart-open provides transparent compression/decompression that works seamlessly across all supported storage systems.

Capabilities

Compression Management

Register custom compression handlers and manage compression behavior.

def register_compressor(ext, callback):
    """Register compression handler for file extension.
    
    Parameters:
        ext: str - File extension with leading dot (e.g., '.gz', '.custom')
        callback: callable - Function accepting (file_obj, mode) returning wrapped file object
        
    Notes:
        Callback should return file-like object that handles compression/decompression
        Mode parameter indicates 'rb'/'wb' for binary or text mode intent
    """

def get_supported_compression_types():
    """Get list of supported compression type identifiers.
    
    Returns:
        list[str] - Compression types like ['gzip', 'bz2', 'zstandard', 'disable', 'infer_from_extension']
    """

def get_supported_extensions():
    """Get list of supported compressed file extensions.
    
    Returns:
        list[str] - File extensions like ['.gz', '.bz2', '.zst', '.xz', '.lzma']
    """

def compression_wrapper(file_obj, mode, compression='infer_from_extension', filename=None):
    """Wrap file object with appropriate compression handler.
    
    Parameters:
        file_obj: file-like object - Base file object to wrap
        mode: str - File mode for compression behavior
        compression: str - Compression type or 'infer_from_extension'
        filename: str - Filename for extension-based inference
        
    Returns:
        file-like object - Wrapped or original file object
    """

Compression Constants

# Compression behavior constants
NO_COMPRESSION = 'disable'
INFER_FROM_EXTENSION = 'infer_from_extension'

Supported Compression Formats

Smart-open supports multiple compression formats out of the box:

  • gzip (.gz) - Most common, good compression ratio and speed
  • bzip2 (.bz2) - Higher compression ratio, slower
  • zstandard (.zst) - Modern format, excellent compression and speed
  • xz/lzma (.xz, .lzma) - High compression ratio
  • lz4 (.lz4) - Very fast compression/decompression

Usage Examples

Automatic Compression Detection

from smart_open import open

# Compression automatically detected from file extension
with open('s3://bucket/data.txt.gz') as f:
    uncompressed_text = f.read()  # Automatically decompressed

with open('gs://bucket/logs.txt.bz2') as f:
    for line in f:  # Line-by-line decompression
        process_log_line(line)

# Writing compressed files (automatic compression)
with open('s3://bucket/output.txt.gz', 'w') as f:
    f.write('This will be compressed with gzip')

with open('azure://container/data.json.zst', 'w') as f:
    json.dump(large_data, f)  # Compressed with zstandard

Explicit Compression Control

# Explicitly specify compression type
with open('s3://bucket/data.txt', compression='gzip') as f:
    content = f.read()

# Disable compression for files with compression extensions
with open('s3://bucket/already-compressed.gz', compression='disable') as f:
    raw_compressed_data = f.read()  # Read as-is, no decompression

# Force compression on write
with open('gs://bucket/output.txt', 'w', compression='bz2') as f:
    f.write('This will be compressed with bzip2')

Binary vs Text Mode with Compression

# Binary mode with compression
with open('s3://bucket/binary-data.dat.gz', 'rb') as f:
    decompressed_bytes = f.read()

with open('s3://bucket/output.bin.zst', 'wb') as f:
    f.write(binary_data)  # Compressed binary write

# Text mode with compression and encoding
with open('gs://bucket/unicode-text.txt.gz', encoding='utf-8') as f:
    unicode_text = f.read()

with open('azure://container/output.csv.bz2', 'w', encoding='utf-8') as f:
    writer = csv.writer(f)
    writer.writerows(data)

Custom Compression Handlers

from smart_open import register_compressor
import lz4.frame

# Register LZ4 compression handler
def lz4_handler(file_obj, mode):
    if 'r' in mode:
        return lz4.frame.open(file_obj, mode='rb')
    else:
        return lz4.frame.open(file_obj, mode='wb')

register_compressor('.lz4', lz4_handler)

# Now .lz4 files work automatically
with open('s3://bucket/data.txt.lz4') as f:
    content = f.read()

# Custom compression with parameters
def custom_gzip_handler(file_obj, mode):
    import gzip
    if 'r' in mode:
        return gzip.open(file_obj, mode='rt', encoding='utf-8')
    else:
        return gzip.open(file_obj, mode='wt', encoding='utf-8', compresslevel=9)

register_compressor('.custom.gz', custom_gzip_handler)

Performance Considerations

Compression Format Selection

# For maximum compression (slower)
with open('s3://bucket/archive.txt.bz2', 'w') as f:
    f.write(large_text_data)

# For fastest compression/decompression
with open('s3://bucket/fast-access.txt.lz4', 'w') as f:
    f.write(frequently_accessed_data)

# Good balance of speed and compression
with open('s3://bucket/balanced.txt.zst', 'w') as f:
    f.write(general_purpose_data)

# Traditional web standard
with open('s3://bucket/web-compatible.txt.gz', 'w') as f:
    f.write(web_data)

Streaming Compression

# Stream large files with compression
with open('s3://bucket/huge-file.txt.gz') as f:
    for line in f:  # Memory-efficient line-by-line decompression
        process_line(line)

# Chunked reading with compression
with open('gs://bucket/large-binary.dat.zst', 'rb') as f:
    while True:
        chunk = f.read(64 * 1024)  # 64KB chunks, decompressed
        if not chunk:
            break
        process_chunk(chunk)

# Streaming write with compression
with open('azure://container/stream-output.txt.gz', 'w') as f:
    for record in generate_large_dataset():
        f.write(f"{record}\n")  # Compressed on-the-fly

Compression Level Control

# Custom compression levels via transport_params
import gzip

def high_compression_gzip(file_obj, mode):
    if 'r' in mode:
        return gzip.open(file_obj, mode='rt')
    else:
        return gzip.open(file_obj, mode='wt', compresslevel=9)

register_compressor('.high.gz', high_compression_gzip)

# Or use existing libraries with custom parameters
import bz2

def fast_bzip2(file_obj, mode):
    if 'r' in mode:
        return bz2.open(file_obj, mode='rt')
    else:
        return bz2.open(file_obj, mode='wt', compresslevel=1)

register_compressor('.fast.bz2', fast_bzip2)

Integration Examples

Data Pipeline Integration

# ETL pipeline with compression
def process_compressed_data():
    # Extract: Read compressed source data
    with open('s3://raw-data/input.csv.gz') as f:
        reader = csv.DictReader(f)
        data = list(reader)
    
    # Transform: Process data
    processed_data = transform_data(data)
    
    # Load: Write compressed output
    with open('s3://processed-data/output.json.zst', 'w') as f:
        json.dump(processed_data, f)

# Batch processing with different compression formats
input_files = [
    's3://data/file1.txt.gz',
    's3://data/file2.txt.bz2', 
    's3://data/file3.txt.zst'
]

for input_file in input_files:
    with open(input_file) as f:  # Automatic decompression
        content = f.read()
        result = process_content(content)
        
        # Output with consistent compression
        output_file = input_file.replace('s3://data/', 's3://results/').replace('.txt.', '.result.')
        with open(output_file, 'w') as out_f:
            out_f.write(result)

Backup and Archival

# Compress backups with maximum compression
import json
from datetime import datetime

backup_data = collect_backup_data()
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')

# High compression for long-term storage
with open(f's3://backups/backup_{timestamp}.json.bz2', 'w') as f:
    json.dump(backup_data, f, separators=(',', ':'))

# Fast compression for recent backups
with open(f's3://recent-backups/backup_{timestamp}.json.lz4', 'w') as f:
    json.dump(backup_data, f, indent=2)

Log Processing

# Process compressed log files
import re
from collections import defaultdict

log_pattern = re.compile(r'(\d{4}-\d{2}-\d{2}) (\w+): (.+)')
stats = defaultdict(int)

# Read compressed logs from multiple sources
log_files = [
    's3://logs/app.log.gz',
    'gs://logs/app.log.bz2',
    'azure://logs/app.log.zst'
]

for log_file in log_files:
    with open(log_file) as f:
        for line in f:
            match = log_pattern.match(line.strip())
            if match:
                date, level, message = match.groups()
                stats[level] += 1

# Write compressed summary
with open('s3://reports/log-summary.json.gz', 'w') as f:
    json.dump(dict(stats), f)

Error Handling

Compression-Specific Errors

import gzip
import bz2
import lzma
from smart_open import open

try:
    with open('s3://bucket/corrupted-file.txt.gz') as f:
        content = f.read()
except gzip.BadGzipFile:
    print("Corrupted gzip file")
except bz2.BadBz2File:
    print("Corrupted bzip2 file") 
except lzma.LZMAError:
    print("Corrupted LZMA/XZ file")
except Exception as e:
    print(f"Other compression error: {e}")

# Fallback to uncompressed reading
try:
    with open('s3://bucket/maybe-compressed.txt') as f:
        content = f.read()
except Exception:
    # Try without decompression
    with open('s3://bucket/maybe-compressed.txt', compression='disable') as f:
        raw_content = f.read()

Validation and Integrity

import hashlib

def verify_compressed_file(uri, expected_hash):
    """Verify integrity of compressed file content."""
    hasher = hashlib.sha256()
    
    try:
        with open(uri, 'rb') as f:
            for chunk in iter(lambda: f.read(8192), b''):
                hasher.update(chunk)
        
        actual_hash = hasher.hexdigest()
        return actual_hash == expected_hash
    except Exception as e:
        print(f"Verification failed: {e}")
        return False

# Usage
if verify_compressed_file('s3://bucket/data.txt.gz', expected_hash):
    print("File integrity verified")
else:
    print("File integrity check failed")

Best Practices

Format Selection Guidelines

  1. Use gzip (.gz) for web compatibility and general use
  2. Use zstandard (.zst) for best balance of speed and compression
  3. Use bzip2 (.bz2) for maximum compression when storage space is critical
  4. Use lz4 (.lz4) for maximum speed when compression ratio is less important
  5. Use xz (.xz) for archival data requiring maximum compression

Performance Optimization

# Pre-compile compression handlers for repeated use
import gzip
import io

class OptimizedGzipHandler:
    def __init__(self, compresslevel=6):
        self.compresslevel = compresslevel
    
    def __call__(self, file_obj, mode):
        if 'r' in mode:
            return gzip.open(file_obj, mode='rt')
        else:
            return gzip.open(file_obj, mode='wt', 
                           compresslevel=self.compresslevel)

# Register optimized handler
register_compressor('.opt.gz', OptimizedGzipHandler(compresslevel=9))

# Batch processing with consistent compression settings
files_to_process = ['file1.txt', 'file2.txt', 'file3.txt']
for filename in files_to_process:
    with open(f's3://bucket/{filename}.opt.gz', 'w') as f:
        f.write(process_file(filename))

Install with Tessl CLI

npx tessl i tessl/pypi-smart-open

docs

big-data.md

cloud-storage.md

compression.md

core-operations.md

index.md

network-access.md

utilities.md

tile.json