CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-zstandard

Zstandard bindings for Python providing high-performance compression and decompression operations

Overview
Eval results
Files

advanced-compression.mddocs/

Advanced Compression

Sophisticated compression capabilities including customizable parameters, streaming interfaces, dictionary support, and multi-threading for high-performance applications.

Capabilities

ZstdCompressor

Main compression class providing full control over compression parameters and advanced compression modes.

class ZstdCompressor:
    def __init__(
        self,
        level: int = 3,
        dict_data: ZstdCompressionDict = None,
        compression_params: ZstdCompressionParameters = None,
        write_checksum: bool = None,
        write_content_size: bool = None,
        write_dict_id: bool = None,
        threads: int = 0
    ):
        """
        Create a compression context.

        Parameters:
        - level: int, compression level (1-22, default 3)
        - dict_data: ZstdCompressionDict, compression dictionary
        - compression_params: ZstdCompressionParameters, detailed parameters
        - write_checksum: bool, include integrity checksum
        - write_content_size: bool, write original size in frame header
        - write_dict_id: bool, write dictionary ID in frame header  
        - threads: int, number of threads for compression (0 = auto)
        """

    def memory_size(self) -> int:
        """Get memory usage of compression context in bytes."""

    def compress(self, data: bytes) -> bytes:
        """
        Compress data in one operation.

        Parameters:
        - data: bytes-like object to compress

        Returns:
        bytes: Compressed data
        """

    def compressobj(self, size: int = -1) -> ZstdCompressionObj:
        """
        Create a compression object for streaming operations.

        Parameters:
        - size: int, hint about total size of data to compress

        Returns:
        ZstdCompressionObj: Streaming compression object
        """

    def chunker(self, size: int = -1, chunk_size: int = -1) -> ZstdCompressionChunker:
        """
        Create a compression chunker for processing data in chunks.

        Parameters:
        - size: int, hint about total size of data
        - chunk_size: int, preferred chunk size

        Returns:
        ZstdCompressionChunker: Chunking compression interface
        """

Usage Example:

import zstandard as zstd

# Basic compressor
compressor = zstd.ZstdCompressor(level=10)
compressed = compressor.compress(b"Data to compress")

# High-performance compressor with threading
compressor = zstd.ZstdCompressor(level=3, threads=4)
compressed = compressor.compress(large_data)

# Memory usage monitoring
print(f"Compressor memory usage: {compressor.memory_size()} bytes")

Streaming Compression

Stream-based compression for handling large data without loading everything into memory.

class ZstdCompressor:
    def stream_writer(
        self,
        writer,
        size: int = -1,
        write_size: int = -1,
        write_return_read: bool = False,
        closefd: bool = True
    ) -> ZstdCompressionWriter:
        """
        Create a streaming compression writer.

        Parameters:
        - writer: file-like object to write compressed data to
        - size: int, hint about total size of data
        - write_size: int, preferred write size
        - write_return_read: bool, return read count instead of write count
        - closefd: bool, whether to close writer when done

        Returns:
        ZstdCompressionWriter: Streaming compression writer
        """

    def stream_reader(
        self,
        source,
        size: int = -1,
        read_size: int = -1,
        closefd: bool = True
    ) -> ZstdCompressionReader:
        """
        Create a streaming compression reader.

        Parameters:
        - source: file-like object or bytes to read from
        - size: int, hint about total size of data
        - read_size: int, preferred read size
        - closefd: bool, whether to close source when done

        Returns:
        ZstdCompressionReader: Streaming compression reader
        """

    def copy_stream(
        self,
        ifh,
        ofh,
        size: int = -1,
        read_size: int = -1,
        write_size: int = -1
    ) -> tuple[int, int]:
        """
        Copy and compress data between streams.

        Parameters:
        - ifh: input file-like object
        - ofh: output file-like object
        - size: int, hint about total size
        - read_size: int, read buffer size
        - write_size: int, write buffer size

        Returns:
        tuple[int, int]: (bytes_read, bytes_written)
        """

Usage Examples:

import zstandard as zstd
import io

compressor = zstd.ZstdCompressor(level=5)

# Stream writer - compress data as you write
output = io.BytesIO()
with compressor.stream_writer(output) as writer:
    writer.write(b"First chunk of data")
    writer.write(b"Second chunk of data")
    writer.write(b"Final chunk")

compressed_data = output.getvalue()

# Stream reader - compress data as you read  
data = b"Large amount of data to compress"
reader = compressor.stream_reader(io.BytesIO(data))
compressed_chunks = []
while True:
    chunk = reader.read(8192)
    if not chunk:
        break
    compressed_chunks.append(chunk)

# Copy between streams with compression
with open('input.txt', 'rb') as input_file, \
     open('output.zst', 'wb') as output_file:
    bytes_read, bytes_written = compressor.copy_stream(input_file, output_file)
    print(f"Read {bytes_read} bytes, wrote {bytes_written} bytes")

Iterative Compression

Compress data in chunks and yield compressed output incrementally, useful for processing large data streams.

class ZstdCompressor:
    def read_to_iter(
        self,
        reader,
        size: int = -1,
        read_size: int = -1,
        write_size: int = -1
    ) -> Generator[bytes, None, None]:
        """
        Compress data from reader and yield compressed chunks.

        Parameters:
        - reader: file-like object or bytes to read from
        - size: int, hint about total size of data
        - read_size: int, read buffer size
        - write_size: int, write buffer size

        Yields:
        bytes: Compressed data chunks
        """

Usage Example:

import zstandard as zstd
import io

compressor = zstd.ZstdCompressor(level=5)

# Process large data iteratively
large_data = b"Very large data content that needs streaming compression..."
reader = io.BytesIO(large_data)

# Compress and process chunks as they're produced
compressed_chunks = []
for chunk in compressor.read_to_iter(reader):
    compressed_chunks.append(chunk)
    # Process each chunk immediately to save memory
    process_compressed_chunk(chunk)

# Combine all chunks if needed
final_compressed = b''.join(compressed_chunks)

Multi-Threading Compression

Parallel compression for improved performance on multi-core systems.

class ZstdCompressor:
    def multi_compress_to_buffer(
        self,
        data,
        threads: int = 0
    ) -> BufferWithSegmentsCollection:
        """
        Compress multiple data items in parallel.

        Parameters:
        - data: BufferWithSegments, BufferWithSegmentsCollection, or list of bytes
        - threads: int, number of threads (0 = auto, -1 = no threading)

        Returns:
        BufferWithSegmentsCollection: Collection of compressed segments
        """

Usage Example:

import zstandard as zstd

compressor = zstd.ZstdCompressor(level=3)

# Prepare multiple data items
data_items = [
    b"First piece of data to compress",
    b"Second piece of data to compress", 
    b"Third piece of data to compress"
]

# Compress in parallel
result = compressor.multi_compress_to_buffer(data_items, threads=4)

# Access compressed segments
for i in range(len(result)):
    segment = result[i]
    print(f"Segment {i}: {len(segment)} bytes")
    compressed_data = segment.tobytes()

Frame Progression Monitoring

Monitor compression progress and statistics during multi-threaded operations.

class ZstdCompressor:
    def frame_progression(self) -> tuple[int, int, int]:
        """
        Get compression progress information.

        Returns:
        tuple[int, int, int]: (bytes_read, bytes_written, bytes_flushed)
        """

Usage Example:

import zstandard as zstd

compressor = zstd.ZstdCompressor(level=5, threads=4)

# Start compression
data = b"Large data to monitor compression progress"
compressed = compressor.compress(data)

# Get progression statistics
bytes_read, bytes_written, bytes_flushed = compressor.frame_progression()
print(f"Read: {bytes_read}, Written: {bytes_written}, Flushed: {bytes_flushed}")

Compression Parameters

Fine-grained control over compression behavior through detailed parameter configuration.

class ZstdCompressionParameters:
    def __init__(
        self,
        format: int = FORMAT_ZSTD1,
        compression_level: int = 3,
        window_log: int = 0,
        hash_log: int = 0,
        chain_log: int = 0,
        search_log: int = 0,
        min_match: int = 0,
        target_length: int = 0,
        strategy: int = 0,
        write_content_size: int = -1,
        write_checksum: int = -1,
        write_dict_id: int = -1,
        job_size: int = 0,
        overlap_log: int = 0,
        force_max_window: int = 0,
        enable_ldm: int = 0,
        ldm_hash_log: int = 0,
        ldm_min_match: int = 0,
        ldm_bucket_size_log: int = 0,
        ldm_hash_rate_log: int = 0,
        threads: int = 0
    ):
        """
        Create detailed compression parameters.

        Parameters:
        - format: int, compression format (FORMAT_ZSTD1, FORMAT_ZSTD1_MAGICLESS)
        - compression_level: int, compression level (1-22)
        - window_log: int, window size as power of 2 (10-31)
        - hash_log: int, hash table size as power of 2 (6-26)
        - chain_log: int, chain table size as power of 2 (6-28)
        - search_log: int, search length as power of 2 (1-26)
        - min_match: int, minimum match length (3-7)
        - target_length: int, target match length (0-999)
        - strategy: int, compression strategy (STRATEGY_*)
        - write_content_size: int, write content size (-1=auto, 0=no, 1=yes)
        - write_checksum: int, write checksum (-1=auto, 0=no, 1=yes)
        - write_dict_id: int, write dictionary ID (-1=auto, 0=no, 1=yes)
        - job_size: int, job size for threading
        - overlap_log: int, overlap size as power of 2
        - force_max_window: int, force maximum window size
        - enable_ldm: int, enable long distance matching
        - ldm_hash_log: int, LDM hash table size as power of 2
        - ldm_min_match: int, LDM minimum match length
        - ldm_bucket_size_log: int, LDM bucket size as power of 2
        - ldm_hash_rate_log: int, LDM hash rate as power of 2
        - threads: int, number of threads
        """

    @staticmethod
    def from_level(
        level: int,
        source_size: int = 0,
        dict_size: int = 0,
        **kwargs
    ) -> ZstdCompressionParameters:
        """
        Create parameters from compression level with optional hints.

        Parameters:
        - level: int, compression level (1-22)
        - source_size: int, hint about source data size
        - dict_size: int, dictionary size if using dictionary
        - **kwargs: additional parameter overrides

        Returns:
        ZstdCompressionParameters: Configured parameters
        """

    def estimated_compression_context_size(self) -> int:
        """Estimate memory usage for these parameters in bytes."""

class CompressionParameters(ZstdCompressionParameters):
    """Compatibility alias for ZstdCompressionParameters."""

Usage Example:

import zstandard as zstd

# Create parameters from level with custom tweaks
params = zstd.ZstdCompressionParameters.from_level(
    level=9,
    source_size=1024*1024,  # 1MB hint
    strategy=zstd.STRATEGY_BTULTRA2,
    enable_ldm=1
)

# Use custom parameters
compressor = zstd.ZstdCompressor(compression_params=params)
compressed = compressor.compress(data)

# Check memory usage
memory_usage = params.estimated_compression_context_size()
print(f"Estimated memory usage: {memory_usage} bytes")

Streaming Objects

Low-level streaming compression objects for fine-grained control over compression process.

class ZstdCompressionObj:
    def compress(self, data: bytes) -> bytes:
        """
        Compress data chunk.

        Parameters:
        - data: bytes to compress

        Returns:
        bytes: Compressed data (may be empty)
        """

    def flush(self, flush_mode: int = COMPRESSOBJ_FLUSH_FINISH) -> bytes:
        """
        Flush compression buffer.

        Parameters:
        - flush_mode: int, flush mode (COMPRESSOBJ_FLUSH_FINISH, COMPRESSOBJ_FLUSH_BLOCK)

        Returns:
        bytes: Final compressed data
        """

class ZstdCompressionChunker:
    def compress(self, data: bytes):
        """Compress data and yield chunks."""

    def flush(self):
        """Flush any remaining data."""

    def finish(self):
        """Finish compression and yield final chunks."""

Usage Example:

import zstandard as zstd

compressor = zstd.ZstdCompressor(level=5)

# Streaming object
obj = compressor.compressobj()
compressed_chunks = []

# Compress data in chunks
compressed_chunks.append(obj.compress(b"First chunk"))
compressed_chunks.append(obj.compress(b"Second chunk"))
compressed_chunks.append(obj.flush())  # Final data

# Chunker interface
chunker = compressor.chunker()
for chunk in chunker.compress(b"Data to compress"):
    process_compressed_chunk(chunk)
    
for chunk in chunker.finish():
    process_final_chunk(chunk)

Install with Tessl CLI

npx tessl i tessl/pypi-zstandard

docs

advanced-compression.md

advanced-decompression.md

buffer-operations.md

dictionary-compression.md

frame-analysis.md

index.md

simple-operations.md

tile.json