Streaming WARC (and ARC) IO library for reading and writing web archive files
Comprehensive HTTP header parsing, manipulation, and formatting with support for status lines, case-insensitive access, and proper encoding handling. The library provides both representation and parsing capabilities for HTTP-style headers used in WARC records.
Main class for representing parsed HTTP status lines and headers with dictionary-like access and manipulation methods.
class StatusAndHeaders:
def __init__(self, statusline, headers, protocol='', total_len=0,
is_http_request=False):
"""
Representation of parsed HTTP-style status line and headers.
Args:
statusline (str): HTTP status line (e.g., '200 OK')
headers (list): List of (name, value) tuples for headers
protocol (str): Protocol string (e.g., 'HTTP/1.1')
total_len (int): Total length of original headers
is_http_request (bool): True if this is a request (splits verb from statusline)
"""
def get_header(self, name, default_value=None):
"""
Get header value by name (case-insensitive).
Args:
name (str): Header name to search for
default_value: Value to return if header not found
Returns:
str or default_value: Header value if found, default_value otherwise
"""
def add_header(self, name, value):
"""
Add a new header.
Args:
name (str): Header name
value (str): Header value
"""
def replace_header(self, name, value):
"""
Replace header with new value or add if not present.
Args:
name (str): Header name
value (str): New header value
Returns:
str or None: Previous header value if replaced, None if added
"""
def remove_header(self, name):
"""
Remove header by name (case-insensitive).
Args:
name (str): Header name to remove
Returns:
bool: True if header was removed, False if not found
"""
def get_statuscode(self):
"""
Extract status code from status line.
Returns:
str: Status code portion of status line
"""
def validate_statusline(self, valid_statusline):
"""
Validate status line and replace if invalid.
Args:
valid_statusline (str): Replacement status line if current is invalid
Returns:
bool: True if original was valid, False if replaced
"""
def add_range(self, start, part_len, total_len):
"""
Add HTTP range headers for partial content responses.
Args:
start (int): Start byte position
part_len (int): Length of partial content
total_len (int): Total content length
Returns:
StatusAndHeaders: Self for method chaining
"""
def compute_headers_buffer(self, header_filter=None):
"""
Pre-compute headers buffer for efficient serialization.
Args:
header_filter (callable): Optional function to filter headers
"""
def to_str(self, filter_func=None):
"""
Convert to string representation.
Args:
filter_func (callable): Optional function to filter headers
Returns:
str: String representation of status and headers
"""
def to_bytes(self, filter_func=None, encoding='utf-8'):
"""
Convert to bytes representation.
Args:
filter_func (callable): Optional function to filter headers
encoding (str): Text encoding to use
Returns:
bytes: Byte representation of status and headers
"""
def to_ascii_bytes(self, filter_func=None):
"""
Convert to ASCII bytes with percent-encoding for non-ASCII characters.
Args:
filter_func (callable): Optional function to filter headers
Returns:
bytes: ASCII-safe byte representation
"""
def percent_encode_non_ascii_headers(self, encoding='UTF-8'):
"""
Percent-encode non-ASCII header values per RFC specifications.
Args:
encoding (str): Encoding to use for percent-encoding
"""
# Dictionary-like interface
def __getitem__(self, key):
"""Get header value by name (same as get_header)."""
def __setitem__(self, key, value):
"""Set header value by name (same as replace_header)."""
def __delitem__(self, key):
"""Delete header by name (same as remove_header)."""
def __contains__(self, key):
"""Check if header exists (case-insensitive)."""Parser for reading HTTP-style status and headers from streams with support for continuation lines and encoding detection.
class StatusAndHeadersParser:
def __init__(self, statuslist, verify=True):
"""
Parser for HTTP-style status and headers.
Args:
statuslist (list): List of valid status line prefixes
verify (bool): Whether to verify status line format
"""
def parse(self, stream, full_statusline=None):
"""
Parse status line and headers from stream.
Args:
stream: Stream supporting readline() method
full_statusline (str): Pre-read status line (optional)
Returns:
StatusAndHeaders: Parsed status and headers object
Raises:
StatusAndHeadersParserException: If parsing fails
EOFError: If stream is at end
"""
@staticmethod
def split_prefix(key, prefixs):
"""
Split key string by first matching prefix.
Args:
key (str): String to split
prefixs (list): List of prefixes to match against
Returns:
tuple: (matched_prefix, remainder) or None if no match
"""
@staticmethod
def make_warc_id(id_=None):
"""
Generate a WARC record ID.
Args:
id_: Optional UUID to use (generates new one if None)
Returns:
str: WARC record ID in URN format
"""
@staticmethod
def decode_header(line):
"""
Decode header line with proper encoding detection.
Args:
line (bytes or str): Header line to decode
Returns:
str: Decoded header line
"""Exception class for header parsing errors with access to problematic status line.
class StatusAndHeadersParserException(Exception):
def __init__(self, msg, statusline):
"""
Exception for status and headers parsing errors.
Args:
msg (str): Error message
statusline (str): Problematic status line
"""from warcio.statusandheaders import StatusAndHeaders
# Create status and headers object
headers_list = [
('Content-Type', 'text/html'),
('Content-Length', '1234'),
('Server', 'Apache/2.4.41')
]
status_headers = StatusAndHeaders('200 OK', headers_list)
# Access headers (case-insensitive)
content_type = status_headers.get_header('content-type')
print(f"Content-Type: {content_type}") # text/html
# Dictionary-like access
content_length = status_headers['Content-Length']
print(f"Content-Length: {content_length}") # 1234
# Check if header exists
if 'server' in status_headers:
print(f"Server: {status_headers['server']}")
# Get status code
code = status_headers.get_statuscode()
print(f"Status Code: {code}") # 200from warcio.statusandheaders import StatusAndHeaders
status_headers = StatusAndHeaders('200 OK', [
('Content-Type', 'text/html'),
('Content-Length', '1234')
])
# Add new header
status_headers.add_header('Cache-Control', 'no-cache')
# Replace existing header
old_length = status_headers.replace_header('Content-Length', '5678')
print(f"Previous length: {old_length}") # 1234
# Remove header
removed = status_headers.remove_header('Cache-Control')
print(f"Header removed: {removed}") # True
# Dictionary-style modification
status_headers['X-Custom-Header'] = 'custom-value'
del status_headers['Content-Type']from warcio.statusandheaders import StatusAndHeaders
# Create request headers (note is_http_request=True)
request_headers = StatusAndHeaders(
'GET /path HTTP/1.1',
[
('Host', 'example.com'),
('User-Agent', 'Mozilla/5.0'),
('Accept', 'text/html,application/xhtml+xml')
],
is_http_request=True
)
# The protocol is extracted from the status line
print(f"Method and path: {request_headers.statusline}") # /path
print(f"Protocol: {request_headers.protocol}") # GETfrom warcio.statusandheaders import StatusAndHeaders
# Create initial response headers
status_headers = StatusAndHeaders('200 OK', [
('Content-Type', 'application/octet-stream'),
('Content-Length', '10000')
])
# Convert to partial content response
status_headers.add_range(start=1000, part_len=2000, total_len=10000)
print(f"Status: {status_headers.statusline}") # 206 Partial Content
print(f"Content-Range: {status_headers.get_header('Content-Range')}")
# bytes 1000-2999/10000
print(f"Content-Length: {status_headers.get_header('Content-Length')}") # 2000from warcio.statusandheaders import StatusAndHeadersParser
import io
# Create parser for HTTP responses
parser = StatusAndHeadersParser(['HTTP/1.0', 'HTTP/1.1'])
# Parse headers from stream
header_data = b"""HTTP/1.1 200 OK\r
Content-Type: text/html\r
Content-Length: 1234\r
Server: Apache/2.4.41\r
\r
"""
stream = io.BytesIO(header_data)
status_headers = parser.parse(stream)
print(f"Status: {status_headers.statusline}") # 200 OK
print(f"Protocol: {status_headers.protocol}") # HTTP/1.1
print(f"Content-Type: {status_headers.get_header('Content-Type')}") # text/htmlfrom warcio.statusandheaders import StatusAndHeaders
# Headers with non-ASCII content
headers_with_unicode = StatusAndHeaders('200 OK', [
('Content-Type', 'text/html; charset=utf-8'),
('Content-Disposition', 'attachment; filename="tëst.txt"'),
('X-Custom', 'Héllo Wörld')
])
# Convert to ASCII bytes (automatically percent-encodes non-ASCII)
ascii_bytes = headers_with_unicode.to_ascii_bytes()
print("ASCII-safe representation created")
# Manual percent-encoding of non-ASCII headers
headers_with_unicode.percent_encode_non_ascii_headers()
print("Non-ASCII headers percent-encoded")from warcio.statusandheaders import StatusAndHeaders
status_headers = StatusAndHeaders('200 OK', [
('Content-Type', 'text/html'),
('Content-Length', '1234'),
('Server', 'Apache/2.4.41'),
('X-Debug', 'sensitive-info')
])
# Define filter function to remove debug headers
def filter_debug_headers(header_tuple):
name, value = header_tuple
if name.lower().startswith('x-debug'):
return None # Remove this header
return header_tuple # Keep this header
# Convert to string with filtering
filtered_headers = status_headers.to_str(filter_func=filter_debug_headers)
print("Headers with debug info filtered out")
# Pre-compute filtered buffer for efficient serialization
status_headers.compute_headers_buffer(header_filter=filter_debug_headers)from warcio.statusandheaders import StatusAndHeaders
# Create headers with potentially invalid status line
status_headers = StatusAndHeaders('Invalid Status', [
('Content-Type', 'text/html')
])
# Validate and fix if necessary
is_valid = status_headers.validate_statusline('200 OK')
if not is_valid:
print("Status line was invalid and has been replaced")
print(f"New status: {status_headers.statusline}") # 200 OKInstall with Tessl CLI
npx tessl i tessl/pypi-warcio