Streaming WARC (and ARC) IO library for reading and writing web archive files
Advanced stream processing capabilities with compression, digest verification, and buffered reading for efficient handling of large archive files and streaming data sources.
Core buffered reader with optional decompression support for efficient stream processing.
class BufferedReader:
def __init__(self, stream, block_size=16384, decomp_type=None,
starting_data=None, read_all_members=False):
"""
Buffered reader with optional decompression.
Args:
stream: Source stream to read from
block_size (int): Buffer size in bytes (default 16384)
decomp_type (str): Decompression type ('gzip', 'deflate', 'brotli', or None)
starting_data (bytes): Pre-read data to include in buffer
read_all_members (bool): Whether to read all compression members
"""
def read(self, length=None):
"""
Read data with buffering and optional decompression.
Args:
length (int): Number of bytes to read (None for all available)
Returns:
bytes: Read data
"""
def readline(self, length=None):
"""
Read a line with buffering and optional decompression.
Args:
length (int): Maximum line length (None for unlimited)
Returns:
bytes: Line data including line ending
"""
def tell(self):
"""
Get current position in buffer.
Returns:
int: Current buffer position
"""
def empty(self):
"""
Check if buffer is empty.
Returns:
bool: True if buffer has no more data
"""
def read_next_member(self):
"""
Move to next compression member (for multi-member files).
Returns:
bool: True if next member found, False if at end
"""
def rem_length(self):
"""
Get remaining buffer length.
Returns:
int: Number of bytes remaining in buffer
"""
def close(self):
"""Close the reader and cleanup resources."""
def set_decomp(self, decomp_type):
"""
Set or change decompression type.
Args:
decomp_type (str): Decompression type ('gzip', 'deflate', 'brotli', or None)
"""
@classmethod
def get_supported_decompressors(cls):
"""
Get list of supported decompression types.
Returns:
list: Available decompression types
"""Specialized buffered reader that defaults to gzip decompression.
class DecompressingBufferedReader(BufferedReader):
def __init__(self, stream, **kwargs):
"""
BufferedReader that defaults to gzip decompression.
Args:
stream: Source stream to read from
**kwargs: Additional arguments passed to BufferedReader
"""Buffered reader with HTTP chunked encoding support for handling chunked transfer encoding.
class ChunkedDataReader(BufferedReader):
def __init__(self, stream, raise_exceptions=False, **kwargs):
"""
BufferedReader with HTTP chunked encoding support.
Args:
stream: Source stream with chunked data
raise_exceptions (bool): Whether to raise exceptions on chunk errors
**kwargs: Additional arguments passed to BufferedReader
"""
class ChunkedDataException(Exception):
def __init__(self, msg, data=b''):
"""
Exception for chunked data parsing errors.
Args:
msg (str): Error message
data (bytes): Problematic data chunk
"""Reader that enforces byte limits for controlled reading of stream portions.
class LimitReader:
def __init__(self, stream, limit):
"""
Reader that limits reading to specified byte count.
Args:
stream: Source stream to read from
limit (int): Maximum number of bytes to read
"""
def read(self, length=None):
"""
Read data with limit enforcement.
Args:
length (int): Number of bytes to read (limited by remaining quota)
Returns:
bytes: Read data (may be less than requested due to limit)
"""
def readline(self, length=None):
"""
Read line with limit enforcement.
Args:
length (int): Maximum line length
Returns:
bytes: Line data (may be truncated due to limit)
"""
def tell(self):
"""
Get position within limited stream.
Returns:
int: Number of bytes read so far
"""
def close(self):
"""Close underlying stream."""
@staticmethod
def wrap_stream(stream, content_length):
"""
Wrap stream with LimitReader if content_length is specified.
Args:
stream: Stream to potentially wrap
content_length (int or None): Content length limit
Returns:
Stream or LimitReader: Original stream or wrapped with limit
"""Reader that verifies digests while reading data, extending LimitReader with digest validation.
class DigestVerifyingReader(LimitReader):
def __init__(self, stream, limit, digest_checker, record_type=None,
payload_digest=None, block_digest=None, segment_number=None):
"""
Reader that verifies digests while reading.
Args:
stream: Source stream to read from
limit (int): Maximum bytes to read
digest_checker: DigestChecker instance for validation
record_type (str): Type of record being read
payload_digest (str): Expected payload digest
block_digest (str): Expected block digest
segment_number (int): Segment number for multi-part records
"""
def begin_payload(self):
"""Mark beginning of payload for digest calculation."""
class DigestChecker:
def __init__(self, kind=None):
"""
Tracks digest verification results.
Args:
kind (str): Type of digest checking being performed
"""
@property
def passed(self):
"""
Whether all digests passed verification.
Returns:
bool: True if all digests verified successfully
"""
@property
def problems(self):
"""
List of problems encountered during verification.
Returns:
list: Problem descriptions
"""
def problem(self, value, passed=False):
"""
Record a verification problem.
Args:
value (str): Description of the problem
passed (bool): Whether this should be considered a pass despite the problem
"""Utility functions for creating and managing decompressors.
def gzip_decompressor():
"""
Create a gzip decompressor.
Returns:
Decompressor object for gzip data
"""
def deflate_decompressor():
"""
Create a deflate decompressor.
Returns:
Decompressor object for deflate data
"""
def deflate_decompressor_alt():
"""
Create alternative deflate decompressor with different window size.
Returns:
Decompressor object for deflate data (alternative settings)
"""
def try_brotli_init():
"""
Initialize brotli decompression support if available.
Returns:
bool: True if brotli is available and initialized
"""Utility classes for computing and managing hash digests during stream processing.
class Digester:
def __init__(self, type_='sha1'):
"""
Hash digest calculator for stream data.
Args:
type_ (str): Hash algorithm type ('sha1', 'md5', 'sha256', etc.)
"""
def update(self, buff):
"""
Update hash with new data.
Args:
buff (bytes): Data to add to hash calculation
"""
def __str__(self):
"""
Get final hash digest as string.
Returns:
str: Hash digest in format 'algorithm:hexdigest'
"""from warcio.bufferedreaders import BufferedReader
import io
# Create buffered reader for efficient reading
data = b"Hello, World!" * 1000
stream = io.BytesIO(data)
reader = BufferedReader(stream, block_size=4096)
# Read data in chunks
chunk1 = reader.read(100)
chunk2 = reader.read(200)
print(f"Read {len(chunk1)} + {len(chunk2)} bytes")
print(f"Buffer position: {reader.tell()}")
# Read lines
stream.seek(0)
reader = BufferedReader(stream)
line = reader.readline()
print(f"First line: {line}")
reader.close()from warcio.bufferedreaders import DecompressingBufferedReader
import gzip
import io
# Create compressed data
original_data = b"This is some test data that will be compressed"
compressed_data = gzip.compress(original_data)
# Read with automatic decompression
stream = io.BytesIO(compressed_data)
reader = DecompressingBufferedReader(stream)
# Data is automatically decompressed
decompressed = reader.read()
print(f"Original: {len(original_data)} bytes")
print(f"Compressed: {len(compressed_data)} bytes")
print(f"Decompressed: {len(decompressed)} bytes")
print(f"Match: {decompressed == original_data}")
reader.close()from warcio.bufferedreaders import BufferedReader
import gzip
import io
# Create gzip compressed data
original_data = b"Manual decompression example data"
compressed_data = gzip.compress(original_data)
# Set up reader with manual decompression type
stream = io.BytesIO(compressed_data)
reader = BufferedReader(stream, decomp_type='gzip')
# Read decompressed data
result = reader.read()
print(f"Manually decompressed: {result == original_data}")
# Check supported decompression types
supported = BufferedReader.get_supported_decompressors()
print(f"Supported decompressors: {supported}")
reader.close()from warcio.bufferedreaders import ChunkedDataReader, ChunkedDataException
import io
# Create HTTP chunked data
chunked_data = b"5\r\nHello\r\n6\r\n World\r\n0\r\n\r\n"
stream = io.BytesIO(chunked_data)
try:
reader = ChunkedDataReader(stream, raise_exceptions=True)
# Read dechunked data
result = reader.read()
print(f"Dechunked data: {result}") # b"Hello World"
except ChunkedDataException as e:
print(f"Chunked data error: {e}")
print(f"Problematic data: {e.data}")
finally:
reader.close()from warcio.limitreader import LimitReader
import io
# Create large data stream
large_data = b"x" * 10000
stream = io.BytesIO(large_data)
# Limit reading to first 100 bytes
limited_reader = LimitReader(stream, limit=100)
# Read data - will stop at limit
data = limited_reader.read()
print(f"Read {len(data)} bytes (limited to 100)")
print(f"Position: {limited_reader.tell()}")
# Trying to read more returns empty
more_data = limited_reader.read()
print(f"Additional read: {len(more_data)} bytes")
limited_reader.close()from warcio.limitreader import LimitReader
import io
# Test automatic wrapping
large_stream = io.BytesIO(b"x" * 1000)
# Wrap with limit if content length specified
wrapped = LimitReader.wrap_stream(large_stream, content_length=100)
print(f"Wrapped stream type: {type(wrapped).__name__}")
# No wrapping if no content length
unwrapped = LimitReader.wrap_stream(large_stream, content_length=None)
print(f"Unwrapped stream type: {type(unwrapped).__name__}")from warcio.digestverifyingreader import DigestVerifyingReader, DigestChecker
import io
import hashlib
# Create test data and calculate digest
test_data = b"This is test data for digest verification"
expected_digest = "sha1:" + hashlib.sha1(test_data).hexdigest()
# Set up digest checker
checker = DigestChecker(kind="test")
stream = io.BytesIO(test_data)
# Create verifying reader
verifying_reader = DigestVerifyingReader(
stream=stream,
limit=len(test_data),
digest_checker=checker,
payload_digest=expected_digest
)
# Begin payload reading (starts digest calculation)
verifying_reader.begin_payload()
# Read data - digest is calculated during reading
data = verifying_reader.read()
print(f"Read {len(data)} bytes")
# Check verification results
print(f"Digest verification passed: {checker.passed}")
if not checker.passed:
print(f"Problems: {checker.problems}")
verifying_reader.close()from warcio.bufferedreaders import BufferedReader
import gzip
import io
# Create multi-member gzip data
data1 = b"First member data"
data2 = b"Second member data"
compressed1 = gzip.compress(data1)
compressed2 = gzip.compress(data2)
multi_member_data = compressed1 + compressed2
# Read multi-member file
stream = io.BytesIO(multi_member_data)
reader = BufferedReader(stream, decomp_type='gzip', read_all_members=True)
# Read first member
member1 = reader.read()
print(f"First member: {member1}")
# Move to next member
if reader.read_next_member():
member2 = reader.read()
print(f"Second member: {member2}")
else:
print("No second member found")
reader.close()from warcio.bufferedreaders import BufferedReader, ChunkedDataException
from warcio.limitreader import LimitReader
import io
stream = io.BytesIO(b"test data")
try:
# Create readers
buffered = BufferedReader(stream)
limited = LimitReader(buffered, limit=50)
# Use readers
data = limited.read(10)
print(f"Successfully read {len(data)} bytes")
except Exception as e:
print(f"Error during reading: {e}")
finally:
# Always close readers to free resources
if 'limited' in locals():
limited.close()
if 'buffered' in locals():
buffered.close()Install with Tessl CLI
npx tessl i tessl/pypi-warcio