LZ4 Bindings for Python providing high-performance lossless data compression with frame and block format support
—
⚠️ Warning: The stream format module is experimental and unmaintained. It is not included in distributed wheels and should be used with caution in production environments.
Stream format compression provides streaming compression interface with double-buffer strategy for continuous data processing.
Class for streaming compression with configurable buffer management.
class LZ4StreamCompressor:
"""Stream compression with double-buffer strategy for continuous data processing."""
def __init__(
self,
strategy: str,
buffer_size: int,
mode: str = "default",
acceleration: bool = True,
compression_level: int = 9,
return_bytearray: bool = False,
store_comp_size: int = 4,
dictionary: str | bytes = ""
):
"""
Initialize stream compressor.
Args:
strategy: Buffer strategy ("double_buffer" only supported) - required
buffer_size: Base buffer size for compression - required
mode: Compression mode ("default", "fast", "high_compression")
acceleration: Acceleration factor for fast mode (default: True, which equals 1)
compression_level: Level for high_compression mode (1-12, default: 9)
return_bytearray: Return bytearray instead of bytes (default: False)
store_comp_size: Bytes for storing compressed block size (0,1,2,4, default: 4)
dictionary: Compression dictionary for improved ratio (default: empty string)
"""
def compress(self, chunk: bytes) -> bytes:
"""
Compress data chunk using stream context.
Args:
chunk: Data chunk to compress
Returns:
bytes: Compressed chunk with size header
"""
def __enter__(self):
"""Context manager entry."""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit."""Class for streaming decompression with buffer management and block extraction.
class LZ4StreamDecompressor:
"""Stream decompression with double-buffer strategy and block extraction."""
def __init__(
self,
strategy: str,
buffer_size: int,
return_bytearray: bool = False,
store_comp_size: int = 4,
dictionary: str | bytes = ""
):
"""
Initialize stream decompressor.
Args:
strategy: Buffer strategy ("double_buffer" only supported) - required
buffer_size: Base buffer size for decompression - required
return_bytearray: Return bytearray instead of bytes (default: False)
store_comp_size: Bytes used for compressed block size (0,1,2,4, default: 4)
dictionary: Decompression dictionary matching compression dictionary (default: empty string)
"""
def decompress(self, chunk: bytes) -> bytes:
"""
Decompress data chunk using stream context.
Args:
chunk: Compressed data chunk with size header
Returns:
bytes: Decompressed data
"""
def get_block(self, stream: bytes) -> bytes:
"""
Extract first compressed block from stream data.
Args:
stream: Stream data containing compressed blocks
Returns:
bytes: First compressed block from stream
"""
def __enter__(self):
"""Context manager entry."""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit."""Exception class for stream operations.
class LZ4StreamError(Exception):
"""Exception raised when LZ4 stream operations fail."""Stream-specific constants and size limits.
LZ4_MAX_INPUT_SIZE: int # Maximum input size for stream operationsimport lz4.stream
# Stream compression with context manager
data_chunks = [b"chunk1", b"chunk2", b"chunk3"]
with lz4.stream.LZ4StreamCompressor("double_buffer", 4096) as compressor:
compressed_chunks = []
for chunk in data_chunks:
compressed = compressor.compress(chunk)
compressed_chunks.append(compressed)
# Stream decompression
with lz4.stream.LZ4StreamDecompressor("double_buffer", 4096) as decompressor:
decompressed_chunks = []
for compressed_chunk in compressed_chunks:
decompressed = decompressor.decompress(compressed_chunk)
decompressed_chunks.append(decompressed)import lz4.stream
# Configure for high-speed streaming
compressor = lz4.stream.LZ4StreamCompressor(
"double_buffer", # Required strategy argument
8192, # Required buffer_size argument
mode="fast",
acceleration=4,
return_bytearray=True
)
decompressor = lz4.stream.LZ4StreamDecompressor(
"double_buffer", # Required strategy argument
8192, # Required buffer_size argument
return_bytearray=True
)
# Process continuous data stream
try:
with compressor, decompressor:
for data_chunk in continuous_data_stream():
# Compress chunk
compressed = compressor.compress(data_chunk)
# Immediately decompress for verification
verified = decompressor.decompress(compressed)
if verified != data_chunk:
raise ValueError("Data integrity check failed")
except lz4.stream.LZ4StreamError as e:
print(f"Stream operation failed: {e}")import lz4.stream
# Use dictionary for improved compression of similar data
dictionary = b"common data patterns that repeat across chunks"
# Setup compressor and decompressor with dictionary
compressor = lz4.stream.LZ4StreamCompressor(
"double_buffer", # Required strategy argument
4096, # Required buffer_size argument
mode="high_compression",
compression_level=12,
dictionary=dictionary
)
decompressor = lz4.stream.LZ4StreamDecompressor(
"double_buffer", # Required strategy argument
4096, # Required buffer_size argument
dictionary=dictionary
)
# Process data with dictionary context
with compressor, decompressor:
for chunk in data_with_common_patterns:
compressed = compressor.compress(chunk)
decompressed = decompressor.decompress(compressed)
assert decompressed == chunkimport lz4.stream
# Extract individual blocks from stream data
decompressor = lz4.stream.LZ4StreamDecompressor(
"double_buffer", # Required strategy argument
4096 # Required buffer_size argument
)
# Get stream data containing multiple compressed blocks
stream_data = get_compressed_stream()
# Extract first block
first_block = decompressor.get_block(stream_data)
# Process the extracted block
with decompressor:
decompressed = decompressor.decompress(first_block)For production stream compression needs, consider:
LZ4FrameCompressor and LZ4FrameDecompressor for production streaminglz4.block functions with custom framing logicgzip, bz2, or lzma modules for established streaming compression# Instead of stream format (experimental)
# import lz4.stream
# with lz4.stream.LZ4StreamCompressor() as compressor:
# compressed = compressor.compress(chunk)
# Use frame format (production-ready)
import lz4.frame
with lz4.frame.LZ4FrameCompressor() as compressor:
header = compressor.begin()
compressed_chunk = compressor.compress(chunk)
footer = compressor.flush()
# Combine for complete compressed data
complete_compressed = header + compressed_chunk + footerInstall with Tessl CLI
npx tessl i tessl/pypi-lz4