0
# Stream Processing
1
2
Advanced stream processing capabilities with compression, digest verification, and buffered reading for efficient handling of large archive files and streaming data sources.
3
4
## Capabilities
5
6
### Buffered Reader
7
8
Core buffered reader with optional decompression support for efficient stream processing.
9
10
```python { .api }
11
class BufferedReader:
12
def __init__(self, stream, block_size=16384, decomp_type=None,
13
starting_data=None, read_all_members=False):
14
"""
15
Buffered reader with optional decompression.
16
17
Args:
18
stream: Source stream to read from
19
block_size (int): Buffer size in bytes (default 16384)
20
decomp_type (str): Decompression type ('gzip', 'deflate', 'brotli', or None)
21
starting_data (bytes): Pre-read data to include in buffer
22
read_all_members (bool): Whether to read all compression members
23
"""
24
25
def read(self, length=None):
26
"""
27
Read data with buffering and optional decompression.
28
29
Args:
30
length (int): Number of bytes to read (None for all available)
31
32
Returns:
33
bytes: Read data
34
"""
35
36
def readline(self, length=None):
37
"""
38
Read a line with buffering and optional decompression.
39
40
Args:
41
length (int): Maximum line length (None for unlimited)
42
43
Returns:
44
bytes: Line data including line ending
45
"""
46
47
def tell(self):
48
"""
49
Get current position in buffer.
50
51
Returns:
52
int: Current buffer position
53
"""
54
55
def empty(self):
56
"""
57
Check if buffer is empty.
58
59
Returns:
60
bool: True if buffer has no more data
61
"""
62
63
def read_next_member(self):
64
"""
65
Move to next compression member (for multi-member files).
66
67
Returns:
68
bool: True if next member found, False if at end
69
"""
70
71
def rem_length(self):
72
"""
73
Get remaining buffer length.
74
75
Returns:
76
int: Number of bytes remaining in buffer
77
"""
78
79
def close(self):
80
"""Close the reader and cleanup resources."""
81
82
def set_decomp(self, decomp_type):
83
"""
84
Set or change decompression type.
85
86
Args:
87
decomp_type (str): Decompression type ('gzip', 'deflate', 'brotli', or None)
88
"""
89
90
@classmethod
91
def get_supported_decompressors(cls):
92
"""
93
Get list of supported decompression types.
94
95
Returns:
96
list: Available decompression types
97
"""
98
```
99
100
### Decompressing Buffered Reader
101
102
Specialized buffered reader that defaults to gzip decompression.
103
104
```python { .api }
105
class DecompressingBufferedReader(BufferedReader):
106
def __init__(self, stream, **kwargs):
107
"""
108
BufferedReader that defaults to gzip decompression.
109
110
Args:
111
stream: Source stream to read from
112
**kwargs: Additional arguments passed to BufferedReader
113
"""
114
```
115
116
### Chunked Data Reader
117
118
Buffered reader with HTTP chunked encoding support for handling chunked transfer encoding.
119
120
```python { .api }
121
class ChunkedDataReader(BufferedReader):
122
def __init__(self, stream, raise_exceptions=False, **kwargs):
123
"""
124
BufferedReader with HTTP chunked encoding support.
125
126
Args:
127
stream: Source stream with chunked data
128
raise_exceptions (bool): Whether to raise exceptions on chunk errors
129
**kwargs: Additional arguments passed to BufferedReader
130
"""
131
132
class ChunkedDataException(Exception):
133
def __init__(self, msg, data=b''):
134
"""
135
Exception for chunked data parsing errors.
136
137
Args:
138
msg (str): Error message
139
data (bytes): Problematic data chunk
140
"""
141
```
142
143
### Limit Reader
144
145
Reader that enforces byte limits for controlled reading of stream portions.
146
147
```python { .api }
148
class LimitReader:
149
def __init__(self, stream, limit):
150
"""
151
Reader that limits reading to specified byte count.
152
153
Args:
154
stream: Source stream to read from
155
limit (int): Maximum number of bytes to read
156
"""
157
158
def read(self, length=None):
159
"""
160
Read data with limit enforcement.
161
162
Args:
163
length (int): Number of bytes to read (limited by remaining quota)
164
165
Returns:
166
bytes: Read data (may be less than requested due to limit)
167
"""
168
169
def readline(self, length=None):
170
"""
171
Read line with limit enforcement.
172
173
Args:
174
length (int): Maximum line length
175
176
Returns:
177
bytes: Line data (may be truncated due to limit)
178
"""
179
180
def tell(self):
181
"""
182
Get position within limited stream.
183
184
Returns:
185
int: Number of bytes read so far
186
"""
187
188
def close(self):
189
"""Close underlying stream."""
190
191
@staticmethod
192
def wrap_stream(stream, content_length):
193
"""
194
Wrap stream with LimitReader if content_length is specified.
195
196
Args:
197
stream: Stream to potentially wrap
198
content_length (int or None): Content length limit
199
200
Returns:
201
Stream or LimitReader: Original stream or wrapped with limit
202
"""
203
```
204
205
### Digest Verifying Reader
206
207
Reader that verifies digests while reading data, extending LimitReader with digest validation.
208
209
```python { .api }
210
class DigestVerifyingReader(LimitReader):
211
def __init__(self, stream, limit, digest_checker, record_type=None,
212
payload_digest=None, block_digest=None, segment_number=None):
213
"""
214
Reader that verifies digests while reading.
215
216
Args:
217
stream: Source stream to read from
218
limit (int): Maximum bytes to read
219
digest_checker: DigestChecker instance for validation
220
record_type (str): Type of record being read
221
payload_digest (str): Expected payload digest
222
block_digest (str): Expected block digest
223
segment_number (int): Segment number for multi-part records
224
"""
225
226
def begin_payload(self):
227
"""Mark beginning of payload for digest calculation."""
228
229
class DigestChecker:
230
def __init__(self, kind=None):
231
"""
232
Tracks digest verification results.
233
234
Args:
235
kind (str): Type of digest checking being performed
236
"""
237
238
@property
239
def passed(self):
240
"""
241
Whether all digests passed verification.
242
243
Returns:
244
bool: True if all digests verified successfully
245
"""
246
247
@property
248
def problems(self):
249
"""
250
List of problems encountered during verification.
251
252
Returns:
253
list: Problem descriptions
254
"""
255
256
def problem(self, value, passed=False):
257
"""
258
Record a verification problem.
259
260
Args:
261
value (str): Description of the problem
262
passed (bool): Whether this should be considered a pass despite the problem
263
"""
264
```
265
266
### Decompression Utilities
267
268
Utility functions for creating and managing decompressors.
269
270
```python { .api }
271
def gzip_decompressor():
272
"""
273
Create a gzip decompressor.
274
275
Returns:
276
Decompressor object for gzip data
277
"""
278
279
def deflate_decompressor():
280
"""
281
Create a deflate decompressor.
282
283
Returns:
284
Decompressor object for deflate data
285
"""
286
287
def deflate_decompressor_alt():
288
"""
289
Create alternative deflate decompressor with different window size.
290
291
Returns:
292
Decompressor object for deflate data (alternative settings)
293
"""
294
295
def try_brotli_init():
296
"""
297
Initialize brotli decompression support if available.
298
299
Returns:
300
bool: True if brotli is available and initialized
301
"""
302
```
303
304
### Hash Digest Utilities
305
306
Utility classes for computing and managing hash digests during stream processing.
307
308
```python { .api }
309
class Digester:
310
def __init__(self, type_='sha1'):
311
"""
312
Hash digest calculator for stream data.
313
314
Args:
315
type_ (str): Hash algorithm type ('sha1', 'md5', 'sha256', etc.)
316
"""
317
318
def update(self, buff):
319
"""
320
Update hash with new data.
321
322
Args:
323
buff (bytes): Data to add to hash calculation
324
"""
325
326
def __str__(self):
327
"""
328
Get final hash digest as string.
329
330
Returns:
331
str: Hash digest in format 'algorithm:hexdigest'
332
"""
333
```
334
335
## Usage Examples
336
337
### Basic Buffered Reading
338
339
```python
340
from warcio.bufferedreaders import BufferedReader
341
import io
342
343
# Create buffered reader for efficient reading
344
data = b"Hello, World!" * 1000
345
stream = io.BytesIO(data)
346
reader = BufferedReader(stream, block_size=4096)
347
348
# Read data in chunks
349
chunk1 = reader.read(100)
350
chunk2 = reader.read(200)
351
352
print(f"Read {len(chunk1)} + {len(chunk2)} bytes")
353
print(f"Buffer position: {reader.tell()}")
354
355
# Read lines
356
stream.seek(0)
357
reader = BufferedReader(stream)
358
line = reader.readline()
359
print(f"First line: {line}")
360
361
reader.close()
362
```
363
364
### Decompression Reading
365
366
```python
367
from warcio.bufferedreaders import DecompressingBufferedReader
368
import gzip
369
import io
370
371
# Create compressed data
372
original_data = b"This is some test data that will be compressed"
373
compressed_data = gzip.compress(original_data)
374
375
# Read with automatic decompression
376
stream = io.BytesIO(compressed_data)
377
reader = DecompressingBufferedReader(stream)
378
379
# Data is automatically decompressed
380
decompressed = reader.read()
381
print(f"Original: {len(original_data)} bytes")
382
print(f"Compressed: {len(compressed_data)} bytes")
383
print(f"Decompressed: {len(decompressed)} bytes")
384
print(f"Match: {decompressed == original_data}")
385
386
reader.close()
387
```
388
389
### Manual Decompression Setup
390
391
```python
392
from warcio.bufferedreaders import BufferedReader
393
import gzip
394
import io
395
396
# Create gzip compressed data
397
original_data = b"Manual decompression example data"
398
compressed_data = gzip.compress(original_data)
399
400
# Set up reader with manual decompression type
401
stream = io.BytesIO(compressed_data)
402
reader = BufferedReader(stream, decomp_type='gzip')
403
404
# Read decompressed data
405
result = reader.read()
406
print(f"Manually decompressed: {result == original_data}")
407
408
# Check supported decompression types
409
supported = BufferedReader.get_supported_decompressors()
410
print(f"Supported decompressors: {supported}")
411
412
reader.close()
413
```
414
415
### Chunked Data Reading
416
417
```python
418
from warcio.bufferedreaders import ChunkedDataReader, ChunkedDataException
419
import io
420
421
# Create HTTP chunked data
422
chunked_data = b"5\r\nHello\r\n6\r\n World\r\n0\r\n\r\n"
423
stream = io.BytesIO(chunked_data)
424
425
try:
426
reader = ChunkedDataReader(stream, raise_exceptions=True)
427
428
# Read dechunked data
429
result = reader.read()
430
print(f"Dechunked data: {result}") # b"Hello World"
431
432
except ChunkedDataException as e:
433
print(f"Chunked data error: {e}")
434
print(f"Problematic data: {e.data}")
435
finally:
436
reader.close()
437
```
438
439
### Limited Reading
440
441
```python
442
from warcio.limitreader import LimitReader
443
import io
444
445
# Create large data stream
446
large_data = b"x" * 10000
447
stream = io.BytesIO(large_data)
448
449
# Limit reading to first 100 bytes
450
limited_reader = LimitReader(stream, limit=100)
451
452
# Read data - will stop at limit
453
data = limited_reader.read()
454
print(f"Read {len(data)} bytes (limited to 100)")
455
print(f"Position: {limited_reader.tell()}")
456
457
# Trying to read more returns empty
458
more_data = limited_reader.read()
459
print(f"Additional read: {len(more_data)} bytes")
460
461
limited_reader.close()
462
```
463
464
### Automatic Stream Wrapping
465
466
```python
467
from warcio.limitreader import LimitReader
468
import io
469
470
# Test automatic wrapping
471
large_stream = io.BytesIO(b"x" * 1000)
472
473
# Wrap with limit if content length specified
474
wrapped = LimitReader.wrap_stream(large_stream, content_length=100)
475
print(f"Wrapped stream type: {type(wrapped).__name__}")
476
477
# No wrapping if no content length
478
unwrapped = LimitReader.wrap_stream(large_stream, content_length=None)
479
print(f"Unwrapped stream type: {type(unwrapped).__name__}")
480
```
481
482
### Digest Verification
483
484
```python
485
from warcio.digestverifyingreader import DigestVerifyingReader, DigestChecker
486
import io
487
import hashlib
488
489
# Create test data and calculate digest
490
test_data = b"This is test data for digest verification"
491
expected_digest = "sha1:" + hashlib.sha1(test_data).hexdigest()
492
493
# Set up digest checker
494
checker = DigestChecker(kind="test")
495
stream = io.BytesIO(test_data)
496
497
# Create verifying reader
498
verifying_reader = DigestVerifyingReader(
499
stream=stream,
500
limit=len(test_data),
501
digest_checker=checker,
502
payload_digest=expected_digest
503
)
504
505
# Begin payload reading (starts digest calculation)
506
verifying_reader.begin_payload()
507
508
# Read data - digest is calculated during reading
509
data = verifying_reader.read()
510
print(f"Read {len(data)} bytes")
511
512
# Check verification results
513
print(f"Digest verification passed: {checker.passed}")
514
if not checker.passed:
515
print(f"Problems: {checker.problems}")
516
517
verifying_reader.close()
518
```
519
520
### Multi-Member Compression
521
522
```python
523
from warcio.bufferedreaders import BufferedReader
524
import gzip
525
import io
526
527
# Create multi-member gzip data
528
data1 = b"First member data"
529
data2 = b"Second member data"
530
compressed1 = gzip.compress(data1)
531
compressed2 = gzip.compress(data2)
532
multi_member_data = compressed1 + compressed2
533
534
# Read multi-member file
535
stream = io.BytesIO(multi_member_data)
536
reader = BufferedReader(stream, decomp_type='gzip', read_all_members=True)
537
538
# Read first member
539
member1 = reader.read()
540
print(f"First member: {member1}")
541
542
# Move to next member
543
if reader.read_next_member():
544
member2 = reader.read()
545
print(f"Second member: {member2}")
546
else:
547
print("No second member found")
548
549
reader.close()
550
```
551
552
### Error Handling and Cleanup
553
554
```python
555
from warcio.bufferedreaders import BufferedReader, ChunkedDataException
556
from warcio.limitreader import LimitReader
557
import io
558
559
stream = io.BytesIO(b"test data")
560
561
try:
562
# Create readers
563
buffered = BufferedReader(stream)
564
limited = LimitReader(buffered, limit=50)
565
566
# Use readers
567
data = limited.read(10)
568
print(f"Successfully read {len(data)} bytes")
569
570
except Exception as e:
571
print(f"Error during reading: {e}")
572
573
finally:
574
# Always close readers to free resources
575
if 'limited' in locals():
576
limited.close()
577
if 'buffered' in locals():
578
buffered.close()
579
```