0
# Stream Format
1
2
⚠️ **Warning**: The stream format module is experimental and unmaintained. It is not included in distributed wheels and should be used with caution in production environments.
3
4
Stream format compression provides streaming compression interface with double-buffer strategy for continuous data processing.
5
6
## Capabilities
7
8
### Stream Compression
9
10
Class for streaming compression with configurable buffer management.
11
12
```python { .api }
13
class LZ4StreamCompressor:
14
"""Stream compression with double-buffer strategy for continuous data processing."""
15
16
def __init__(
17
self,
18
strategy: str,
19
buffer_size: int,
20
mode: str = "default",
21
acceleration: bool = True,
22
compression_level: int = 9,
23
return_bytearray: bool = False,
24
store_comp_size: int = 4,
25
dictionary: str | bytes = ""
26
):
27
"""
28
Initialize stream compressor.
29
30
Args:
31
strategy: Buffer strategy ("double_buffer" only supported) - required
32
buffer_size: Base buffer size for compression - required
33
mode: Compression mode ("default", "fast", "high_compression")
34
acceleration: Acceleration factor for fast mode (default: True, which equals 1)
35
compression_level: Level for high_compression mode (1-12, default: 9)
36
return_bytearray: Return bytearray instead of bytes (default: False)
37
store_comp_size: Bytes for storing compressed block size (0,1,2,4, default: 4)
38
dictionary: Compression dictionary for improved ratio (default: empty string)
39
"""
40
41
def compress(self, chunk: bytes) -> bytes:
42
"""
43
Compress data chunk using stream context.
44
45
Args:
46
chunk: Data chunk to compress
47
48
Returns:
49
bytes: Compressed chunk with size header
50
"""
51
52
def __enter__(self):
53
"""Context manager entry."""
54
return self
55
56
def __exit__(self, exc_type, exc_val, exc_tb):
57
"""Context manager exit."""
58
```
59
60
### Stream Decompression
61
62
Class for streaming decompression with buffer management and block extraction.
63
64
```python { .api }
65
class LZ4StreamDecompressor:
66
"""Stream decompression with double-buffer strategy and block extraction."""
67
68
def __init__(
69
self,
70
strategy: str,
71
buffer_size: int,
72
return_bytearray: bool = False,
73
store_comp_size: int = 4,
74
dictionary: str | bytes = ""
75
):
76
"""
77
Initialize stream decompressor.
78
79
Args:
80
strategy: Buffer strategy ("double_buffer" only supported) - required
81
buffer_size: Base buffer size for decompression - required
82
return_bytearray: Return bytearray instead of bytes (default: False)
83
store_comp_size: Bytes used for compressed block size (0,1,2,4, default: 4)
84
dictionary: Decompression dictionary matching compression dictionary (default: empty string)
85
"""
86
87
def decompress(self, chunk: bytes) -> bytes:
88
"""
89
Decompress data chunk using stream context.
90
91
Args:
92
chunk: Compressed data chunk with size header
93
94
Returns:
95
bytes: Decompressed data
96
"""
97
98
def get_block(self, stream: bytes) -> bytes:
99
"""
100
Extract first compressed block from stream data.
101
102
Args:
103
stream: Stream data containing compressed blocks
104
105
Returns:
106
bytes: First compressed block from stream
107
"""
108
109
def __enter__(self):
110
"""Context manager entry."""
111
return self
112
113
def __exit__(self, exc_type, exc_val, exc_tb):
114
"""Context manager exit."""
115
```
116
117
### Exception Handling
118
119
Exception class for stream operations.
120
121
```python { .api }
122
class LZ4StreamError(Exception):
123
"""Exception raised when LZ4 stream operations fail."""
124
```
125
126
### Constants
127
128
Stream-specific constants and size limits.
129
130
```python { .api }
131
LZ4_MAX_INPUT_SIZE: int # Maximum input size for stream operations
132
```
133
134
## Usage Examples
135
136
### Basic Stream Compression
137
138
```python
139
import lz4.stream
140
141
# Stream compression with context manager
142
data_chunks = [b"chunk1", b"chunk2", b"chunk3"]
143
144
with lz4.stream.LZ4StreamCompressor("double_buffer", 4096) as compressor:
145
compressed_chunks = []
146
147
for chunk in data_chunks:
148
compressed = compressor.compress(chunk)
149
compressed_chunks.append(compressed)
150
151
# Stream decompression
152
with lz4.stream.LZ4StreamDecompressor("double_buffer", 4096) as decompressor:
153
decompressed_chunks = []
154
155
for compressed_chunk in compressed_chunks:
156
decompressed = decompressor.decompress(compressed_chunk)
157
decompressed_chunks.append(decompressed)
158
```
159
160
### High-Performance Streaming
161
162
```python
163
import lz4.stream
164
165
# Configure for high-speed streaming
166
compressor = lz4.stream.LZ4StreamCompressor(
167
"double_buffer", # Required strategy argument
168
8192, # Required buffer_size argument
169
mode="fast",
170
acceleration=4,
171
return_bytearray=True
172
)
173
174
decompressor = lz4.stream.LZ4StreamDecompressor(
175
"double_buffer", # Required strategy argument
176
8192, # Required buffer_size argument
177
return_bytearray=True
178
)
179
180
# Process continuous data stream
181
try:
182
with compressor, decompressor:
183
for data_chunk in continuous_data_stream():
184
# Compress chunk
185
compressed = compressor.compress(data_chunk)
186
187
# Immediately decompress for verification
188
verified = decompressor.decompress(compressed)
189
190
if verified != data_chunk:
191
raise ValueError("Data integrity check failed")
192
193
except lz4.stream.LZ4StreamError as e:
194
print(f"Stream operation failed: {e}")
195
```
196
197
### Dictionary-Based Stream Compression
198
199
```python
200
import lz4.stream
201
202
# Use dictionary for improved compression of similar data
203
dictionary = b"common data patterns that repeat across chunks"
204
205
# Setup compressor and decompressor with dictionary
206
compressor = lz4.stream.LZ4StreamCompressor(
207
"double_buffer", # Required strategy argument
208
4096, # Required buffer_size argument
209
mode="high_compression",
210
compression_level=12,
211
dictionary=dictionary
212
)
213
214
decompressor = lz4.stream.LZ4StreamDecompressor(
215
"double_buffer", # Required strategy argument
216
4096, # Required buffer_size argument
217
dictionary=dictionary
218
)
219
220
# Process data with dictionary context
221
with compressor, decompressor:
222
for chunk in data_with_common_patterns:
223
compressed = compressor.compress(chunk)
224
decompressed = decompressor.decompress(compressed)
225
226
assert decompressed == chunk
227
```
228
229
### Block Extraction
230
231
```python
232
import lz4.stream
233
234
# Extract individual blocks from stream data
235
decompressor = lz4.stream.LZ4StreamDecompressor(
236
"double_buffer", # Required strategy argument
237
4096 # Required buffer_size argument
238
)
239
240
# Get stream data containing multiple compressed blocks
241
stream_data = get_compressed_stream()
242
243
# Extract first block
244
first_block = decompressor.get_block(stream_data)
245
246
# Process the extracted block
247
with decompressor:
248
decompressed = decompressor.decompress(first_block)
249
```
250
251
## Limitations and Considerations
252
253
### Experimental Status
254
255
- **Unmaintained**: This module is not actively maintained
256
- **Distribution**: Not included in distributed wheels (pip installs)
257
- **Stability**: May have undocumented behavior or bugs
258
- **Production Use**: Not recommended for production applications
259
260
### Alternative Recommendations
261
262
For production stream compression needs, consider:
263
264
1. **Frame Format with Incremental Classes**: Use `LZ4FrameCompressor` and `LZ4FrameDecompressor` for production streaming
265
2. **Block Format with Manual Framing**: Use `lz4.block` functions with custom framing logic
266
3. **Standard Library**: Consider `gzip`, `bz2`, or `lzma` modules for established streaming compression
267
268
### Example Migration to Frame Format
269
270
```python
271
# Instead of stream format (experimental)
272
# import lz4.stream
273
# with lz4.stream.LZ4StreamCompressor() as compressor:
274
# compressed = compressor.compress(chunk)
275
276
# Use frame format (production-ready)
277
import lz4.frame
278
279
with lz4.frame.LZ4FrameCompressor() as compressor:
280
header = compressor.begin()
281
compressed_chunk = compressor.compress(chunk)
282
footer = compressor.flush()
283
284
# Combine for complete compressed data
285
complete_compressed = header + compressed_chunk + footer
286
```