Zstandard bindings for Python providing high-performance compression and decompression operations
Sophisticated decompression capabilities including streaming interfaces, frame analysis, batch processing, and dictionary support for high-performance data decompression.
Main decompression class providing full control over decompression parameters and advanced decompression modes.
class ZstdDecompressor:
def __init__(
self,
dict_data: ZstdCompressionDict = None,
max_window_size: int = 0,
format: int = FORMAT_ZSTD1
):
"""
Create a decompression context.
Parameters:
- dict_data: ZstdCompressionDict, decompression dictionary
- max_window_size: int, maximum window size (0 = unlimited)
- format: int, expected format (FORMAT_ZSTD1, FORMAT_ZSTD1_MAGICLESS)
"""
def memory_size(self) -> int:
"""Get memory usage of decompression context in bytes."""
def decompress(
self,
data: bytes,
max_output_size: int = 0,
read_across_frames: bool = False,
allow_extra_data: bool = False
) -> bytes:
"""
Decompress data in one operation.
Parameters:
- data: bytes-like object containing compressed data
- max_output_size: int, maximum output size (0 = unlimited)
- read_across_frames: bool, read multiple frames
- allow_extra_data: bool, allow trailing data after frame
Returns:
bytes: Decompressed data
"""
def decompressobj(
self,
write_size: int = -1,
read_across_frames: bool = False
) -> ZstdDecompressionObj:
"""
Create a decompression object for streaming operations.
Parameters:
- write_size: int, preferred write size
- read_across_frames: bool, process multiple frames
Returns:
ZstdDecompressionObj: Streaming decompression object
"""Usage Example:
import zstandard as zstd
# Basic decompressor
decompressor = zstd.ZstdDecompressor()
decompressed = decompressor.decompress(compressed_data)
# Decompressor with safety limits
decompressor = zstd.ZstdDecompressor(max_window_size=1<<20) # 1MB window limit
decompressed = decompressor.decompress(
compressed_data,
max_output_size=10*1024*1024 # 10MB output limit
)
# Memory usage monitoring
print(f"Decompressor memory usage: {decompressor.memory_size()} bytes")Stream-based decompression for handling large compressed data without loading everything into memory.
class ZstdDecompressor:
def stream_reader(
self,
source,
read_size: int = -1,
read_across_frames: bool = False,
closefd: bool = False
) -> ZstdDecompressionReader:
"""
Create a streaming decompression reader.
Parameters:
- source: file-like object or bytes to read from
- read_size: int, preferred read size
- read_across_frames: bool, read multiple frames
- closefd: bool, whether to close source when done
Returns:
ZstdDecompressionReader: Streaming decompression reader
"""
def stream_writer(
self,
writer,
write_size: int = -1,
write_return_read: bool = False,
closefd: bool = True
) -> ZstdDecompressionWriter:
"""
Create a streaming decompression writer.
Parameters:
- writer: file-like object to write decompressed data to
- write_size: int, preferred write size
- write_return_read: bool, return read count instead of write count
- closefd: bool, whether to close writer when done
Returns:
ZstdDecompressionWriter: Streaming decompression writer
"""
def copy_stream(
self,
ifh,
ofh,
read_size: int = -1,
write_size: int = -1
) -> tuple[int, int]:
"""
Copy and decompress data between streams.
Parameters:
- ifh: input file-like object with compressed data
- ofh: output file-like object for decompressed data
- read_size: int, read buffer size
- write_size: int, write buffer size
Returns:
tuple[int, int]: (bytes_read, bytes_written)
"""
def read_to_iter(
self,
reader,
read_size: int = -1,
write_size: int = -1,
skip_bytes: int = 0
):
"""
Create iterator that yields decompressed chunks.
Parameters:
- reader: file-like object or bytes to read from
- read_size: int, read buffer size
- write_size: int, output chunk size
- skip_bytes: int, bytes to skip at start
Yields:
bytes: Decompressed data chunks
"""Usage Examples:
import zstandard as zstd
import io
decompressor = zstd.ZstdDecompressor()
# Stream reader - decompress data as you read
compressed_data = b"..." # compressed data
reader = decompressor.stream_reader(io.BytesIO(compressed_data))
decompressed_chunks = []
while True:
chunk = reader.read(8192)
if not chunk:
break
decompressed_chunks.append(chunk)
# Stream writer - decompress data as you write
output = io.BytesIO()
with decompressor.stream_writer(output) as writer:
writer.write(compressed_chunk1)
writer.write(compressed_chunk2)
decompressed_data = output.getvalue()
# Copy between streams with decompression
with open('compressed.zst', 'rb') as input_file, \
open('decompressed.txt', 'wb') as output_file:
bytes_read, bytes_written = decompressor.copy_stream(input_file, output_file)
print(f"Read {bytes_read} bytes, wrote {bytes_written} bytes")
# Iterator interface
for chunk in decompressor.read_to_iter(io.BytesIO(compressed_data)):
process_decompressed_chunk(chunk)Parallel decompression for improved performance when processing multiple compressed items.
class ZstdDecompressor:
def multi_decompress_to_buffer(
self,
frames,
decompressed_sizes: bytes = b"",
threads: int = 0
) -> BufferWithSegmentsCollection:
"""
Decompress multiple frames in parallel.
Parameters:
- frames: BufferWithSegments, BufferWithSegmentsCollection, or list of bytes
- decompressed_sizes: bytes, expected sizes of decompressed data
- threads: int, number of threads (0 = auto, -1 = no threading)
Returns:
BufferWithSegmentsCollection: Collection of decompressed segments
"""Usage Example:
import zstandard as zstd
decompressor = zstd.ZstdDecompressor()
# Prepare multiple compressed frames
compressed_frames = [
compressed_data1,
compressed_data2,
compressed_data3
]
# Decompress in parallel
result = decompressor.multi_decompress_to_buffer(compressed_frames, threads=4)
# Access decompressed segments
for i in range(len(result)):
segment = result[i]
print(f"Segment {i}: {len(segment)} bytes")
decompressed_data = segment.tobytes()Special decompression mode for processing dictionary-compressed frame chains.
class ZstdDecompressor:
def decompress_content_dict_chain(self, frames: list[bytes]) -> bytes:
"""
Decompress a chain of frames where each frame uses the previous as dictionary.
Parameters:
- frames: list of bytes, frames in dependency order
Returns:
bytes: Final decompressed data
"""Usage Example:
import zstandard as zstd
decompressor = zstd.ZstdDecompressor()
# Frames where each uses previous as dictionary
frame_chain = [
base_frame,
dependent_frame1,
dependent_frame2
]
# Decompress the chain
final_data = decompressor.decompress_content_dict_chain(frame_chain)Low-level streaming decompression objects for fine-grained control over the decompression process.
class ZstdDecompressionObj:
def decompress(self, data: bytes) -> bytes:
"""
Decompress data chunk.
Parameters:
- data: bytes to decompress
Returns:
bytes: Decompressed data (may be empty)
"""
def flush(self, length: int = -1) -> bytes:
"""
Flush decompression buffer.
Parameters:
- length: int, maximum bytes to return
Returns:
bytes: Remaining decompressed data
"""
@property
def unused_data(self) -> bytes:
"""Unused input data after frame end."""
@property
def unconsumed_tail(self) -> bytes:
"""Input data not yet processed."""
@property
def eof(self) -> bool:
"""Whether end of frame has been reached."""Usage Example:
import zstandard as zstd
decompressor = zstd.ZstdDecompressor()
obj = decompressor.decompressobj()
decompressed_chunks = []
compressed_data = b"..." # compressed data
# Process data in chunks
chunk_size = 8192
for i in range(0, len(compressed_data), chunk_size):
chunk = compressed_data[i:i+chunk_size]
decompressed = obj.decompress(chunk)
if decompressed:
decompressed_chunks.append(decompressed)
# Check if frame is complete
if obj.eof:
break
# Get any remaining data
remaining = obj.flush()
if remaining:
decompressed_chunks.append(remaining)
# Check for unused data
if obj.unused_data:
print(f"Unused data: {len(obj.unused_data)} bytes")Stream readers and writers that provide full file-like interfaces for decompression.
class ZstdDecompressionReader:
def read(self, size: int = -1) -> bytes:
"""Read decompressed data."""
def read1(self, size: int = -1) -> bytes:
"""Read at most one buffer worth of data."""
def readinto(self, b) -> int:
"""Read data into pre-allocated buffer."""
def readinto1(self, b) -> int:
"""Read data into buffer, at most one read operation."""
def readline(self, size: int = -1) -> bytes:
"""Read line from decompressed data."""
def readlines(self, hint: int = -1) -> list[bytes]:
"""Read lines from decompressed data."""
def seek(self, pos: int, whence: int = 0) -> int:
"""Seek within decompressed data (limited support)."""
def tell(self) -> int:
"""Get current position."""
def close(self):
"""Close reader and underlying source."""
class ZstdDecompressionWriter:
def write(self, data: bytes) -> int:
"""Write compressed data for decompression."""
def flush(self):
"""Flush any buffered data."""
def close(self):
"""Close writer and underlying destination."""Usage Example:
import zstandard as zstd
decompressor = zstd.ZstdDecompressor()
# Use as file-like reader
with open('data.zst', 'rb') as f:
reader = decompressor.stream_reader(f)
# Read line by line
for line in reader:
process_line(line)
# Random access (if supported)
reader.seek(0)
first_chunk = reader.read(1024)
# Use as file-like writer
with open('output.txt', 'wb') as f:
writer = decompressor.stream_writer(f)
# Write compressed data for decompression
writer.write(compressed_chunk1)
writer.write(compressed_chunk2)
writer.flush()Install with Tessl CLI
npx tessl i tessl/pypi-zstandard