Zstandard bindings for Python providing high-performance compression and decompression operations
Sophisticated compression capabilities including customizable parameters, streaming interfaces, dictionary support, and multi-threading for high-performance applications.
Main compression class providing full control over compression parameters and advanced compression modes.
class ZstdCompressor:
def __init__(
self,
level: int = 3,
dict_data: ZstdCompressionDict = None,
compression_params: ZstdCompressionParameters = None,
write_checksum: bool = None,
write_content_size: bool = None,
write_dict_id: bool = None,
threads: int = 0
):
"""
Create a compression context.
Parameters:
- level: int, compression level (1-22, default 3)
- dict_data: ZstdCompressionDict, compression dictionary
- compression_params: ZstdCompressionParameters, detailed parameters
- write_checksum: bool, include integrity checksum
- write_content_size: bool, write original size in frame header
- write_dict_id: bool, write dictionary ID in frame header
- threads: int, number of threads for compression (0 = auto)
"""
def memory_size(self) -> int:
"""Get memory usage of compression context in bytes."""
def compress(self, data: bytes) -> bytes:
"""
Compress data in one operation.
Parameters:
- data: bytes-like object to compress
Returns:
bytes: Compressed data
"""
def compressobj(self, size: int = -1) -> ZstdCompressionObj:
"""
Create a compression object for streaming operations.
Parameters:
- size: int, hint about total size of data to compress
Returns:
ZstdCompressionObj: Streaming compression object
"""
def chunker(self, size: int = -1, chunk_size: int = -1) -> ZstdCompressionChunker:
"""
Create a compression chunker for processing data in chunks.
Parameters:
- size: int, hint about total size of data
- chunk_size: int, preferred chunk size
Returns:
ZstdCompressionChunker: Chunking compression interface
"""Usage Example:
import zstandard as zstd
# Basic compressor
compressor = zstd.ZstdCompressor(level=10)
compressed = compressor.compress(b"Data to compress")
# High-performance compressor with threading
compressor = zstd.ZstdCompressor(level=3, threads=4)
compressed = compressor.compress(large_data)
# Memory usage monitoring
print(f"Compressor memory usage: {compressor.memory_size()} bytes")Stream-based compression for handling large data without loading everything into memory.
class ZstdCompressor:
def stream_writer(
self,
writer,
size: int = -1,
write_size: int = -1,
write_return_read: bool = False,
closefd: bool = True
) -> ZstdCompressionWriter:
"""
Create a streaming compression writer.
Parameters:
- writer: file-like object to write compressed data to
- size: int, hint about total size of data
- write_size: int, preferred write size
- write_return_read: bool, return read count instead of write count
- closefd: bool, whether to close writer when done
Returns:
ZstdCompressionWriter: Streaming compression writer
"""
def stream_reader(
self,
source,
size: int = -1,
read_size: int = -1,
closefd: bool = True
) -> ZstdCompressionReader:
"""
Create a streaming compression reader.
Parameters:
- source: file-like object or bytes to read from
- size: int, hint about total size of data
- read_size: int, preferred read size
- closefd: bool, whether to close source when done
Returns:
ZstdCompressionReader: Streaming compression reader
"""
def copy_stream(
self,
ifh,
ofh,
size: int = -1,
read_size: int = -1,
write_size: int = -1
) -> tuple[int, int]:
"""
Copy and compress data between streams.
Parameters:
- ifh: input file-like object
- ofh: output file-like object
- size: int, hint about total size
- read_size: int, read buffer size
- write_size: int, write buffer size
Returns:
tuple[int, int]: (bytes_read, bytes_written)
"""Usage Examples:
import zstandard as zstd
import io
compressor = zstd.ZstdCompressor(level=5)
# Stream writer - compress data as you write
output = io.BytesIO()
with compressor.stream_writer(output) as writer:
writer.write(b"First chunk of data")
writer.write(b"Second chunk of data")
writer.write(b"Final chunk")
compressed_data = output.getvalue()
# Stream reader - compress data as you read
data = b"Large amount of data to compress"
reader = compressor.stream_reader(io.BytesIO(data))
compressed_chunks = []
while True:
chunk = reader.read(8192)
if not chunk:
break
compressed_chunks.append(chunk)
# Copy between streams with compression
with open('input.txt', 'rb') as input_file, \
open('output.zst', 'wb') as output_file:
bytes_read, bytes_written = compressor.copy_stream(input_file, output_file)
print(f"Read {bytes_read} bytes, wrote {bytes_written} bytes")Compress data in chunks and yield compressed output incrementally, useful for processing large data streams.
class ZstdCompressor:
def read_to_iter(
self,
reader,
size: int = -1,
read_size: int = -1,
write_size: int = -1
) -> Generator[bytes, None, None]:
"""
Compress data from reader and yield compressed chunks.
Parameters:
- reader: file-like object or bytes to read from
- size: int, hint about total size of data
- read_size: int, read buffer size
- write_size: int, write buffer size
Yields:
bytes: Compressed data chunks
"""Usage Example:
import zstandard as zstd
import io
compressor = zstd.ZstdCompressor(level=5)
# Process large data iteratively
large_data = b"Very large data content that needs streaming compression..."
reader = io.BytesIO(large_data)
# Compress and process chunks as they're produced
compressed_chunks = []
for chunk in compressor.read_to_iter(reader):
compressed_chunks.append(chunk)
# Process each chunk immediately to save memory
process_compressed_chunk(chunk)
# Combine all chunks if needed
final_compressed = b''.join(compressed_chunks)Parallel compression for improved performance on multi-core systems.
class ZstdCompressor:
def multi_compress_to_buffer(
self,
data,
threads: int = 0
) -> BufferWithSegmentsCollection:
"""
Compress multiple data items in parallel.
Parameters:
- data: BufferWithSegments, BufferWithSegmentsCollection, or list of bytes
- threads: int, number of threads (0 = auto, -1 = no threading)
Returns:
BufferWithSegmentsCollection: Collection of compressed segments
"""Usage Example:
import zstandard as zstd
compressor = zstd.ZstdCompressor(level=3)
# Prepare multiple data items
data_items = [
b"First piece of data to compress",
b"Second piece of data to compress",
b"Third piece of data to compress"
]
# Compress in parallel
result = compressor.multi_compress_to_buffer(data_items, threads=4)
# Access compressed segments
for i in range(len(result)):
segment = result[i]
print(f"Segment {i}: {len(segment)} bytes")
compressed_data = segment.tobytes()Monitor compression progress and statistics during multi-threaded operations.
class ZstdCompressor:
def frame_progression(self) -> tuple[int, int, int]:
"""
Get compression progress information.
Returns:
tuple[int, int, int]: (bytes_read, bytes_written, bytes_flushed)
"""Usage Example:
import zstandard as zstd
compressor = zstd.ZstdCompressor(level=5, threads=4)
# Start compression
data = b"Large data to monitor compression progress"
compressed = compressor.compress(data)
# Get progression statistics
bytes_read, bytes_written, bytes_flushed = compressor.frame_progression()
print(f"Read: {bytes_read}, Written: {bytes_written}, Flushed: {bytes_flushed}")Fine-grained control over compression behavior through detailed parameter configuration.
class ZstdCompressionParameters:
def __init__(
self,
format: int = FORMAT_ZSTD1,
compression_level: int = 3,
window_log: int = 0,
hash_log: int = 0,
chain_log: int = 0,
search_log: int = 0,
min_match: int = 0,
target_length: int = 0,
strategy: int = 0,
write_content_size: int = -1,
write_checksum: int = -1,
write_dict_id: int = -1,
job_size: int = 0,
overlap_log: int = 0,
force_max_window: int = 0,
enable_ldm: int = 0,
ldm_hash_log: int = 0,
ldm_min_match: int = 0,
ldm_bucket_size_log: int = 0,
ldm_hash_rate_log: int = 0,
threads: int = 0
):
"""
Create detailed compression parameters.
Parameters:
- format: int, compression format (FORMAT_ZSTD1, FORMAT_ZSTD1_MAGICLESS)
- compression_level: int, compression level (1-22)
- window_log: int, window size as power of 2 (10-31)
- hash_log: int, hash table size as power of 2 (6-26)
- chain_log: int, chain table size as power of 2 (6-28)
- search_log: int, search length as power of 2 (1-26)
- min_match: int, minimum match length (3-7)
- target_length: int, target match length (0-999)
- strategy: int, compression strategy (STRATEGY_*)
- write_content_size: int, write content size (-1=auto, 0=no, 1=yes)
- write_checksum: int, write checksum (-1=auto, 0=no, 1=yes)
- write_dict_id: int, write dictionary ID (-1=auto, 0=no, 1=yes)
- job_size: int, job size for threading
- overlap_log: int, overlap size as power of 2
- force_max_window: int, force maximum window size
- enable_ldm: int, enable long distance matching
- ldm_hash_log: int, LDM hash table size as power of 2
- ldm_min_match: int, LDM minimum match length
- ldm_bucket_size_log: int, LDM bucket size as power of 2
- ldm_hash_rate_log: int, LDM hash rate as power of 2
- threads: int, number of threads
"""
@staticmethod
def from_level(
level: int,
source_size: int = 0,
dict_size: int = 0,
**kwargs
) -> ZstdCompressionParameters:
"""
Create parameters from compression level with optional hints.
Parameters:
- level: int, compression level (1-22)
- source_size: int, hint about source data size
- dict_size: int, dictionary size if using dictionary
- **kwargs: additional parameter overrides
Returns:
ZstdCompressionParameters: Configured parameters
"""
def estimated_compression_context_size(self) -> int:
"""Estimate memory usage for these parameters in bytes."""
class CompressionParameters(ZstdCompressionParameters):
"""Compatibility alias for ZstdCompressionParameters."""Usage Example:
import zstandard as zstd
# Create parameters from level with custom tweaks
params = zstd.ZstdCompressionParameters.from_level(
level=9,
source_size=1024*1024, # 1MB hint
strategy=zstd.STRATEGY_BTULTRA2,
enable_ldm=1
)
# Use custom parameters
compressor = zstd.ZstdCompressor(compression_params=params)
compressed = compressor.compress(data)
# Check memory usage
memory_usage = params.estimated_compression_context_size()
print(f"Estimated memory usage: {memory_usage} bytes")Low-level streaming compression objects for fine-grained control over compression process.
class ZstdCompressionObj:
def compress(self, data: bytes) -> bytes:
"""
Compress data chunk.
Parameters:
- data: bytes to compress
Returns:
bytes: Compressed data (may be empty)
"""
def flush(self, flush_mode: int = COMPRESSOBJ_FLUSH_FINISH) -> bytes:
"""
Flush compression buffer.
Parameters:
- flush_mode: int, flush mode (COMPRESSOBJ_FLUSH_FINISH, COMPRESSOBJ_FLUSH_BLOCK)
Returns:
bytes: Final compressed data
"""
class ZstdCompressionChunker:
def compress(self, data: bytes):
"""Compress data and yield chunks."""
def flush(self):
"""Flush any remaining data."""
def finish(self):
"""Finish compression and yield final chunks."""Usage Example:
import zstandard as zstd
compressor = zstd.ZstdCompressor(level=5)
# Streaming object
obj = compressor.compressobj()
compressed_chunks = []
# Compress data in chunks
compressed_chunks.append(obj.compress(b"First chunk"))
compressed_chunks.append(obj.compress(b"Second chunk"))
compressed_chunks.append(obj.flush()) # Final data
# Chunker interface
chunker = compressor.chunker()
for chunk in chunker.compress(b"Data to compress"):
process_compressed_chunk(chunk)
for chunk in chunker.finish():
process_final_chunk(chunk)Install with Tessl CLI
npx tessl i tessl/pypi-zstandard