CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-zstandard

Zstandard bindings for Python providing high-performance compression and decompression operations

Overview
Eval results
Files

frame-analysis.mddocs/

Frame Analysis

Utilities for analyzing zstd frames and extracting metadata without full decompression, enabling efficient frame inspection and validation.

Capabilities

Frame Content Size

Extract the original content size from a zstd frame header without decompressing the data.

def frame_content_size(data: bytes) -> int:
    """
    Get the original content size from a zstd frame.

    Parameters:
    - data: bytes, zstd frame data (at least frame header)

    Returns:
    int: Original content size in bytes, or special values:
         - CONTENTSIZE_UNKNOWN: Content size not stored in frame
         - CONTENTSIZE_ERROR: Invalid frame or unable to determine size
    """

Usage Example:

import zstandard as zstd

# Compressed data with content size in header
compressor = zstd.ZstdCompressor(write_content_size=True)
original_data = b"Hello, World!" * 1000
compressed = compressor.compress(original_data)

# Get content size without decompressing
content_size = zstd.frame_content_size(compressed)

if content_size == zstd.CONTENTSIZE_UNKNOWN:
    print("Content size not stored in frame")
elif content_size == zstd.CONTENTSIZE_ERROR:
    print("Error reading frame")
else:
    print(f"Original size: {content_size} bytes")
    print(f"Compressed size: {len(compressed)} bytes")
    print(f"Compression ratio: {len(original_data)/len(compressed):.2f}:1")

Frame Header Size

Get the size of a zstd frame header to skip to the compressed payload.

def frame_header_size(data: bytes) -> int:
    """
    Get the size of a zstd frame header.

    Parameters:
    - data: bytes, zstd frame data (at least frame header)

    Returns:
    int: Frame header size in bytes
    """

Usage Example:

import zstandard as zstd

compressed_data = b"..."  # zstd compressed data

# Get header size
header_size = zstd.frame_header_size(compressed_data)
print(f"Frame header size: {header_size} bytes")

# Split header and payload
header = compressed_data[:header_size]
payload = compressed_data[header_size:]

print(f"Header: {len(header)} bytes")
print(f"Payload: {len(payload)} bytes")

Frame Parameters

Extract detailed parameters and metadata from a zstd frame header.

def get_frame_parameters(data: bytes, format: int = FORMAT_ZSTD1) -> FrameParameters:
    """
    Extract frame parameters from zstd frame header.

    Parameters:
    - data: bytes, zstd frame data (at least frame header)
    - format: int, expected frame format (FORMAT_ZSTD1, FORMAT_ZSTD1_MAGICLESS)

    Returns:
    FrameParameters: Object containing frame metadata
    """

class FrameParameters:
    """Container for zstd frame parameters and metadata."""
    
    @property
    def content_size(self) -> int:
        """Original content size (-1 if unknown)."""
    
    @property
    def window_size(self) -> int:
        """Window size used for compression."""
    
    @property
    def dict_id(self) -> int:
        """Dictionary ID (0 if no dictionary)."""
    
    @property
    def has_checksum(self) -> bool:
        """Whether frame includes content checksum."""

Usage Example:

import zstandard as zstd

# Create compressed data with various options
compressor = zstd.ZstdCompressor(
    level=5,
    write_content_size=True,
    write_checksum=True,
    write_dict_id=True
)

data = b"Sample data for frame analysis"
compressed = compressor.compress(data)

# Analyze frame parameters
params = zstd.get_frame_parameters(compressed)

print(f"Content size: {params.content_size}")
print(f"Window size: {params.window_size}")
print(f"Dictionary ID: {params.dict_id}")
print(f"Has checksum: {params.has_checksum}")

# Validate expectations
assert params.content_size == len(data)
assert params.has_checksum == True

Frame Format Detection

Handle different zstd frame formats including standard and magicless frames.

Usage Example:

import zstandard as zstd

# Standard frame with magic number
standard_compressor = zstd.ZstdCompressor()
standard_compressed = standard_compressor.compress(b"Standard frame data")

# Magicless frame  
magicless_params = zstd.ZstdCompressionParameters(format=zstd.FORMAT_ZSTD1_MAGICLESS)
magicless_compressor = zstd.ZstdCompressor(compression_params=magicless_params)
magicless_compressed = magicless_compressor.compress(b"Magicless frame data")

# Analyze different formats
standard_params = zstd.get_frame_parameters(standard_compressed, zstd.FORMAT_ZSTD1)
magicless_params = zstd.get_frame_parameters(magicless_compressed, zstd.FORMAT_ZSTD1_MAGICLESS)

print("Standard frame:")
print(f"  Content size: {standard_params.content_size}")
print(f"  Window size: {standard_params.window_size}")

print("Magicless frame:")
print(f"  Content size: {magicless_params.content_size}")
print(f"  Window size: {magicless_params.window_size}")

Multi-Frame Analysis

Analyze compressed data containing multiple zstd frames.

Usage Example:

import zstandard as zstd

def analyze_multi_frame_data(data: bytes):
    """Analyze compressed data that may contain multiple frames."""
    frames = []
    offset = 0
    
    while offset < len(data):
        try:
            # Try to get frame parameters
            remaining_data = data[offset:]
            params = zstd.get_frame_parameters(remaining_data)
            
            # Get frame header size
            header_size = zstd.frame_header_size(remaining_data)
            
            # Calculate frame size (header + compressed payload)
            # This is simplified - real implementation would need to parse the frame
            if params.content_size > 0:
                # Estimate compressed size (not exact)
                estimated_compressed_size = params.content_size // 4  # rough estimate
                frame_size = header_size + estimated_compressed_size
            else:
                # For unknown content size, would need full frame parsing
                break
            
            frame_info = {
                'offset': offset,
                'header_size': header_size,
                'content_size': params.content_size,
                'window_size': params.window_size,
                'dict_id': params.dict_id,
                'has_checksum': params.has_checksum
            }
            frames.append(frame_info)
            
            offset += frame_size
            
        except Exception as e:
            print(f"Error analyzing frame at offset {offset}: {e}")
            break
    
    return frames

# Example usage
compressor = zstd.ZstdCompressor(write_content_size=True)
frame1 = compressor.compress(b"First frame data")
frame2 = compressor.compress(b"Second frame data")
frame3 = compressor.compress(b"Third frame data")

multi_frame_data = frame1 + frame2 + frame3
frames = analyze_multi_frame_data(multi_frame_data)

for i, frame in enumerate(frames):
    print(f"Frame {i+1}:")
    print(f"  Offset: {frame['offset']}")
    print(f"  Header size: {frame['header_size']}")
    print(f"  Content size: {frame['content_size']}")
    print(f"  Window size: {frame['window_size']}")

Frame Validation

Validate frame integrity and format without full decompression.

Usage Example:

import zstandard as zstd

def validate_frame(data: bytes) -> dict:
    """Validate a zstd frame and return analysis results."""
    result = {
        'valid': False,
        'error': None,
        'analysis': None
    }
    
    try:
        # Check minimum size
        if len(data) < 4:
            result['error'] = "Data too short for zstd frame"
            return result
        
        # Check magic number
        if data[:4] != zstd.FRAME_HEADER:
            result['error'] = "Invalid zstd magic number"
            return result
        
        # Get frame parameters
        params = zstd.get_frame_parameters(data)
        
        # Validate parameters
        if params.content_size == zstd.CONTENTSIZE_ERROR:
            result['error'] = "Error reading frame parameters"
            return result
        
        # Get header size
        header_size = zstd.frame_header_size(data)
        
        if header_size <= 0 or header_size > len(data):
            result['error'] = f"Invalid header size: {header_size}"
            return result
        
        result['valid'] = True
        result['analysis'] = {
            'header_size': header_size,
            'content_size': params.content_size,
            'window_size': params.window_size,
            'dict_id': params.dict_id,
            'has_checksum': params.has_checksum,
            'total_size': len(data)
        }
        
    except Exception as e:
        result['error'] = str(e)
    
    return result

# Example usage
compressor = zstd.ZstdCompressor(write_checksum=True)
valid_data = compressor.compress(b"Valid frame data")
invalid_data = b"Invalid frame data"

# Validate frames
valid_result = validate_frame(valid_data)
invalid_result = validate_frame(invalid_data)

print("Valid frame:", valid_result['valid'])
if valid_result['valid']:
    analysis = valid_result['analysis']
    print(f"  Header size: {analysis['header_size']}")
    print(f"  Content size: {analysis['content_size']}")
    print(f"  Has checksum: {analysis['has_checksum']}")

print("Invalid frame:", invalid_result['valid'])
if not invalid_result['valid']:
    print(f"  Error: {invalid_result['error']}")

Decompression Context Estimation

Estimate memory requirements for decompression without actually decompressing.

def estimate_decompression_context_size() -> int:
    """
    Estimate memory usage for decompression context.

    Returns:
    int: Estimated memory usage in bytes
    """

Usage Example:

import zstandard as zstd

# Estimate memory usage
estimated_memory = zstd.estimate_decompression_context_size()
print(f"Estimated decompression context size: {estimated_memory} bytes")

# Use for memory planning
def plan_decompression(compressed_frames: list[bytes]) -> dict:
    """Plan memory usage for batch decompression."""
    base_memory = zstd.estimate_decompression_context_size()
    
    total_compressed = sum(len(frame) for frame in compressed_frames)
    total_content_size = 0
    
    for frame in compressed_frames:
        try:
            content_size = zstd.frame_content_size(frame)
            if content_size > 0:
                total_content_size += content_size
        except:
            # Estimate if content size unknown
            total_content_size += len(frame) * 4  # rough estimate
    
    return {
        'base_memory': base_memory,
        'total_compressed': total_compressed,
        'estimated_decompressed': total_content_size,
        'peak_memory_estimate': base_memory + total_content_size
    }

# Example
frames = [compressed1, compressed2, compressed3]
plan = plan_decompression(frames)
print(f"Peak memory estimate: {plan['peak_memory_estimate']} bytes")

Constants

Frame analysis uses several constants for special values and format identification:

# Content size special values
CONTENTSIZE_UNKNOWN: int  # Content size not stored in frame
CONTENTSIZE_ERROR: int    # Error reading content size

# Frame format constants
FORMAT_ZSTD1: int           # Standard zstd format with magic number
FORMAT_ZSTD1_MAGICLESS: int # Zstd format without magic number

# Frame header magic number
FRAME_HEADER: bytes         # b"\x28\xb5\x2f\xfd"
MAGIC_NUMBER: int          # Magic number as integer

Performance Notes

  • Frame analysis operations are very fast as they only read headers
  • No decompression is performed, making these operations suitable for large-scale analysis
  • Use frame analysis to validate data before attempting decompression
  • Content size information enables memory pre-allocation for better performance
  • Frame parameter analysis helps choose appropriate decompression settings

Install with Tessl CLI

npx tessl i tessl/pypi-zstandard

docs

advanced-compression.md

advanced-decompression.md

buffer-operations.md

dictionary-compression.md

frame-analysis.md

index.md

simple-operations.md

tile.json