Zstandard bindings for Python providing high-performance compression and decompression operations
Utilities for analyzing zstd frames and extracting metadata without full decompression, enabling efficient frame inspection and validation.
Extract the original content size from a zstd frame header without decompressing the data.
def frame_content_size(data: bytes) -> int:
"""
Get the original content size from a zstd frame.
Parameters:
- data: bytes, zstd frame data (at least frame header)
Returns:
int: Original content size in bytes, or special values:
- CONTENTSIZE_UNKNOWN: Content size not stored in frame
- CONTENTSIZE_ERROR: Invalid frame or unable to determine size
"""Usage Example:
import zstandard as zstd
# Compressed data with content size in header
compressor = zstd.ZstdCompressor(write_content_size=True)
original_data = b"Hello, World!" * 1000
compressed = compressor.compress(original_data)
# Get content size without decompressing
content_size = zstd.frame_content_size(compressed)
if content_size == zstd.CONTENTSIZE_UNKNOWN:
print("Content size not stored in frame")
elif content_size == zstd.CONTENTSIZE_ERROR:
print("Error reading frame")
else:
print(f"Original size: {content_size} bytes")
print(f"Compressed size: {len(compressed)} bytes")
print(f"Compression ratio: {len(original_data)/len(compressed):.2f}:1")Get the size of a zstd frame header to skip to the compressed payload.
def frame_header_size(data: bytes) -> int:
"""
Get the size of a zstd frame header.
Parameters:
- data: bytes, zstd frame data (at least frame header)
Returns:
int: Frame header size in bytes
"""Usage Example:
import zstandard as zstd
compressed_data = b"..." # zstd compressed data
# Get header size
header_size = zstd.frame_header_size(compressed_data)
print(f"Frame header size: {header_size} bytes")
# Split header and payload
header = compressed_data[:header_size]
payload = compressed_data[header_size:]
print(f"Header: {len(header)} bytes")
print(f"Payload: {len(payload)} bytes")Extract detailed parameters and metadata from a zstd frame header.
def get_frame_parameters(data: bytes, format: int = FORMAT_ZSTD1) -> FrameParameters:
"""
Extract frame parameters from zstd frame header.
Parameters:
- data: bytes, zstd frame data (at least frame header)
- format: int, expected frame format (FORMAT_ZSTD1, FORMAT_ZSTD1_MAGICLESS)
Returns:
FrameParameters: Object containing frame metadata
"""
class FrameParameters:
"""Container for zstd frame parameters and metadata."""
@property
def content_size(self) -> int:
"""Original content size (-1 if unknown)."""
@property
def window_size(self) -> int:
"""Window size used for compression."""
@property
def dict_id(self) -> int:
"""Dictionary ID (0 if no dictionary)."""
@property
def has_checksum(self) -> bool:
"""Whether frame includes content checksum."""Usage Example:
import zstandard as zstd
# Create compressed data with various options
compressor = zstd.ZstdCompressor(
level=5,
write_content_size=True,
write_checksum=True,
write_dict_id=True
)
data = b"Sample data for frame analysis"
compressed = compressor.compress(data)
# Analyze frame parameters
params = zstd.get_frame_parameters(compressed)
print(f"Content size: {params.content_size}")
print(f"Window size: {params.window_size}")
print(f"Dictionary ID: {params.dict_id}")
print(f"Has checksum: {params.has_checksum}")
# Validate expectations
assert params.content_size == len(data)
assert params.has_checksum == TrueHandle different zstd frame formats including standard and magicless frames.
Usage Example:
import zstandard as zstd
# Standard frame with magic number
standard_compressor = zstd.ZstdCompressor()
standard_compressed = standard_compressor.compress(b"Standard frame data")
# Magicless frame
magicless_params = zstd.ZstdCompressionParameters(format=zstd.FORMAT_ZSTD1_MAGICLESS)
magicless_compressor = zstd.ZstdCompressor(compression_params=magicless_params)
magicless_compressed = magicless_compressor.compress(b"Magicless frame data")
# Analyze different formats
standard_params = zstd.get_frame_parameters(standard_compressed, zstd.FORMAT_ZSTD1)
magicless_params = zstd.get_frame_parameters(magicless_compressed, zstd.FORMAT_ZSTD1_MAGICLESS)
print("Standard frame:")
print(f" Content size: {standard_params.content_size}")
print(f" Window size: {standard_params.window_size}")
print("Magicless frame:")
print(f" Content size: {magicless_params.content_size}")
print(f" Window size: {magicless_params.window_size}")Analyze compressed data containing multiple zstd frames.
Usage Example:
import zstandard as zstd
def analyze_multi_frame_data(data: bytes):
"""Analyze compressed data that may contain multiple frames."""
frames = []
offset = 0
while offset < len(data):
try:
# Try to get frame parameters
remaining_data = data[offset:]
params = zstd.get_frame_parameters(remaining_data)
# Get frame header size
header_size = zstd.frame_header_size(remaining_data)
# Calculate frame size (header + compressed payload)
# This is simplified - real implementation would need to parse the frame
if params.content_size > 0:
# Estimate compressed size (not exact)
estimated_compressed_size = params.content_size // 4 # rough estimate
frame_size = header_size + estimated_compressed_size
else:
# For unknown content size, would need full frame parsing
break
frame_info = {
'offset': offset,
'header_size': header_size,
'content_size': params.content_size,
'window_size': params.window_size,
'dict_id': params.dict_id,
'has_checksum': params.has_checksum
}
frames.append(frame_info)
offset += frame_size
except Exception as e:
print(f"Error analyzing frame at offset {offset}: {e}")
break
return frames
# Example usage
compressor = zstd.ZstdCompressor(write_content_size=True)
frame1 = compressor.compress(b"First frame data")
frame2 = compressor.compress(b"Second frame data")
frame3 = compressor.compress(b"Third frame data")
multi_frame_data = frame1 + frame2 + frame3
frames = analyze_multi_frame_data(multi_frame_data)
for i, frame in enumerate(frames):
print(f"Frame {i+1}:")
print(f" Offset: {frame['offset']}")
print(f" Header size: {frame['header_size']}")
print(f" Content size: {frame['content_size']}")
print(f" Window size: {frame['window_size']}")Validate frame integrity and format without full decompression.
Usage Example:
import zstandard as zstd
def validate_frame(data: bytes) -> dict:
"""Validate a zstd frame and return analysis results."""
result = {
'valid': False,
'error': None,
'analysis': None
}
try:
# Check minimum size
if len(data) < 4:
result['error'] = "Data too short for zstd frame"
return result
# Check magic number
if data[:4] != zstd.FRAME_HEADER:
result['error'] = "Invalid zstd magic number"
return result
# Get frame parameters
params = zstd.get_frame_parameters(data)
# Validate parameters
if params.content_size == zstd.CONTENTSIZE_ERROR:
result['error'] = "Error reading frame parameters"
return result
# Get header size
header_size = zstd.frame_header_size(data)
if header_size <= 0 or header_size > len(data):
result['error'] = f"Invalid header size: {header_size}"
return result
result['valid'] = True
result['analysis'] = {
'header_size': header_size,
'content_size': params.content_size,
'window_size': params.window_size,
'dict_id': params.dict_id,
'has_checksum': params.has_checksum,
'total_size': len(data)
}
except Exception as e:
result['error'] = str(e)
return result
# Example usage
compressor = zstd.ZstdCompressor(write_checksum=True)
valid_data = compressor.compress(b"Valid frame data")
invalid_data = b"Invalid frame data"
# Validate frames
valid_result = validate_frame(valid_data)
invalid_result = validate_frame(invalid_data)
print("Valid frame:", valid_result['valid'])
if valid_result['valid']:
analysis = valid_result['analysis']
print(f" Header size: {analysis['header_size']}")
print(f" Content size: {analysis['content_size']}")
print(f" Has checksum: {analysis['has_checksum']}")
print("Invalid frame:", invalid_result['valid'])
if not invalid_result['valid']:
print(f" Error: {invalid_result['error']}")Estimate memory requirements for decompression without actually decompressing.
def estimate_decompression_context_size() -> int:
"""
Estimate memory usage for decompression context.
Returns:
int: Estimated memory usage in bytes
"""Usage Example:
import zstandard as zstd
# Estimate memory usage
estimated_memory = zstd.estimate_decompression_context_size()
print(f"Estimated decompression context size: {estimated_memory} bytes")
# Use for memory planning
def plan_decompression(compressed_frames: list[bytes]) -> dict:
"""Plan memory usage for batch decompression."""
base_memory = zstd.estimate_decompression_context_size()
total_compressed = sum(len(frame) for frame in compressed_frames)
total_content_size = 0
for frame in compressed_frames:
try:
content_size = zstd.frame_content_size(frame)
if content_size > 0:
total_content_size += content_size
except:
# Estimate if content size unknown
total_content_size += len(frame) * 4 # rough estimate
return {
'base_memory': base_memory,
'total_compressed': total_compressed,
'estimated_decompressed': total_content_size,
'peak_memory_estimate': base_memory + total_content_size
}
# Example
frames = [compressed1, compressed2, compressed3]
plan = plan_decompression(frames)
print(f"Peak memory estimate: {plan['peak_memory_estimate']} bytes")Frame analysis uses several constants for special values and format identification:
# Content size special values
CONTENTSIZE_UNKNOWN: int # Content size not stored in frame
CONTENTSIZE_ERROR: int # Error reading content size
# Frame format constants
FORMAT_ZSTD1: int # Standard zstd format with magic number
FORMAT_ZSTD1_MAGICLESS: int # Zstd format without magic number
# Frame header magic number
FRAME_HEADER: bytes # b"\x28\xb5\x2f\xfd"
MAGIC_NUMBER: int # Magic number as integerInstall with Tessl CLI
npx tessl i tessl/pypi-zstandard