Streaming WARC (and ARC) IO library for reading and writing web archive files
Built-in command line utilities for indexing, checking, extracting, and recompressing WARC/ARC files. These tools provide essential functionality for web archive management and validation.
Creates JSON indexes of WARC/ARC files for efficient searching and analysis.
class Indexer:
def __init__(self, fields, inputs, output, verify_http=False):
"""
Creates JSON indexes of WARC/ARC files.
Args:
fields (list): List of fields to extract for indexing
inputs (list): List of input file paths to index
output: Output stream for JSON index data
verify_http (bool): Whether to verify HTTP headers during indexing
"""
def process_all(self):
"""
Process all input files and generate complete index.
Iterates through all input files and creates JSON index entries
for each record based on specified fields.
"""
def process_one(self, input_, output, filename):
"""
Process a single input file.
Args:
input_: Input file stream or path
output: Output stream for index data
filename (str): Name of the file being processed
"""
def get_field(self, record, name, it, filename):
"""
Extract field value from a record.
Args:
record: WARC/ARC record to extract from
name (str): Field name to extract
it: Iterator context
filename (str): Source filename
Returns:
Field value for the specified name
"""Verifies WARC file integrity and digest validation for quality assurance.
class Checker:
def __init__(self, cmd):
"""
Verifies WARC file integrity and digests.
Args:
cmd: Command configuration object with checking parameters
"""
def process_all(self):
"""
Check all configured input files.
Performs integrity checking on all files specified in the
command configuration, validating digests and structure.
"""
def process_one(self, filename):
"""
Check integrity of a single file.
Args:
filename (str): Path to WARC/ARC file to check
Validates file structure, record headers, and digest values
if present in the records.
"""Extracts specific records from WARC/ARC files based on offset positions.
class Extractor:
def __init__(self, filename, offset):
"""
Extracts specific records from WARC/ARC files.
Args:
filename (str): Path to WARC/ARC file
offset (int): Byte offset of record to extract
"""
def extract(self, payload_only, headers_only):
"""
Extract and output the record at specified offset.
Args:
payload_only (bool): Extract only the payload content
headers_only (bool): Extract only the headers
Outputs the extracted content to stdout or configured output.
"""Fixes compression issues in WARC/ARC files by recompressing with proper chunking.
class Recompressor:
def __init__(self, filename, output, verbose=False):
"""
Fixes compression issues in WARC/ARC files.
Args:
filename (str): Path to input WARC/ARC file
output (str): Path for output file
verbose (bool): Enable verbose output during processing
"""
def recompress(self):
"""
Recompress the file with proper gzip member boundaries.
Fixes issues where gzip files contain multiple records in a
single member, which prevents proper seeking and random access.
Each record is compressed into its own gzip member.
"""Entry points and version utilities for command line interface.
def main(args=None):
"""
Main CLI entry point for warcio command.
Args:
args (list): Command line arguments (uses sys.argv if None)
Parses command line arguments and dispatches to appropriate
subcommand (index, check, extract, recompress).
"""
def get_version():
"""
Get warcio package version.
Returns:
str: Current version of warcio package
"""from warcio.indexer import Indexer
import sys
# Define fields to extract for index
fields = ['offset', 'length', 'url', 'mime', 'status', 'digest']
# Create indexer
input_files = ['example.warc.gz', 'another.warc.gz']
indexer = Indexer(
fields=fields,
inputs=input_files,
output=sys.stdout,
verify_http=True
)
# Generate index
indexer.process_all()
# Output will be JSON lines format:
# {"offset": 0, "length": 1234, "url": "http://example.com", ...}
# {"offset": 1234, "length": 5678, "url": "http://example.org", ...}from warcio.checker import Checker
# Create command-like object for checker
class CheckCommand:
def __init__(self, files):
self.files = files
self.verbose = True
cmd = CheckCommand(['test.warc.gz', 'another.warc.gz'])
checker = Checker(cmd)
# Check all files
try:
checker.process_all()
print("All files passed integrity checks")
except Exception as e:
print(f"Integrity check failed: {e}")
# Check single file
try:
checker.process_one('specific.warc.gz')
print("File integrity verified")
except Exception as e:
print(f"File has integrity issues: {e}")from warcio.extractor import Extractor
# Extract record at specific offset
extractor = Extractor(filename='example.warc.gz', offset=1234)
# Extract complete record (headers + payload)
print("=== Complete Record ===")
extractor.extract(payload_only=False, headers_only=False)
# Extract only payload
print("\n=== Payload Only ===")
extractor.extract(payload_only=True, headers_only=False)
# Extract only headers
print("\n=== Headers Only ===")
extractor.extract(payload_only=False, headers_only=True)from warcio.recompressor import Recompressor
# Fix compression issues in a WARC file
recompressor = Recompressor(
filename='problematic.warc.gz',
output='fixed.warc.gz',
verbose=True
)
try:
recompressor.recompress()
print("Successfully recompressed file")
print("Each record is now in its own gzip member for proper seeking")
except Exception as e:
print(f"Recompression failed: {e}")The tools are primarily designed for command line use via the warcio command:
# Index a WARC file
warcio index --fields url,mime,status example.warc.gz
# Check file integrity
warcio check example.warc.gz
# Extract record at specific offset
warcio extract example.warc.gz 1234
# Extract only payload
warcio extract --payload-only example.warc.gz 1234
# Extract only headers
warcio extract --headers-only example.warc.gz 1234
# Recompress to fix gzip issues
warcio recompress problematic.warc.gz fixed.warc.gz
# Get version
warcio --versionfrom warcio.indexer import Indexer
from warcio.checker import Checker
import glob
import json
import sys
# Process all WARC files in directory
warc_files = glob.glob('*.warc.gz')
# Create comprehensive index
print("Creating index...")
indexer = Indexer(
fields=['offset', 'length', 'url', 'mime', 'status', 'digest', 'date'],
inputs=warc_files,
output=open('complete_index.jsonl', 'w'),
verify_http=True
)
indexer.process_all()
# Verify all files
print("Checking file integrity...")
class BatchCheckCommand:
def __init__(self, files):
self.files = files
self.verbose = False
checker = Checker(BatchCheckCommand(warc_files))
failed_files = []
for filename in warc_files:
try:
checker.process_one(filename)
print(f"✓ {filename}")
except Exception as e:
print(f"✗ {filename}: {e}")
failed_files.append(filename)
print(f"\nSummary: {len(warc_files) - len(failed_files)}/{len(warc_files)} files passed")
if failed_files:
print(f"Failed files: {failed_files}")from warcio.indexer import Indexer
import sys
class CustomIndexer(Indexer):
def get_field(self, record, name, it, filename):
"""Override to add custom field extraction."""
# Standard fields
if name == 'url':
return record.rec_headers.get_header('WARC-Target-URI')
elif name == 'type':
return record.rec_type
elif name == 'date':
return record.rec_headers.get_header('WARC-Date')
elif name == 'filename':
return filename
# Custom fields
elif name == 'has_http_headers':
return bool(record.http_headers)
elif name == 'content_length':
if record.http_headers:
return record.http_headers.get_header('Content-Length')
return None
elif name == 'server':
if record.http_headers:
return record.http_headers.get_header('Server')
return None
# Fallback to parent implementation
return super().get_field(record, name, it, filename)
# Use custom indexer
custom_fields = ['url', 'type', 'date', 'has_http_headers', 'content_length', 'server']
indexer = CustomIndexer(
fields=custom_fields,
inputs=['example.warc.gz'],
output=sys.stdout,
verify_http=True
)
indexer.process_all()from warcio.checker import Checker
from warcio.recompressor import Recompressor
from warcio.indexer import Indexer
import os
import tempfile
def process_archive_pipeline(input_file):
"""Complete pipeline for processing a WARC archive."""
# Step 1: Check integrity
print(f"Checking {input_file}...")
class SimpleCommand:
def __init__(self, files):
self.files = files
checker = Checker(SimpleCommand([input_file]))
try:
checker.process_one(input_file)
print("✓ Integrity check passed")
processed_file = input_file
except Exception as e:
print(f"⚠ Integrity issues detected: {e}")
# Step 2: Try to fix with recompression
print("Attempting to fix with recompression...")
temp_file = tempfile.mktemp(suffix='.warc.gz')
recompressor = Recompressor(input_file, temp_file, verbose=True)
recompressor.recompress()
# Verify the fixed file
checker.process_one(temp_file)
print("✓ File fixed and verified")
processed_file = temp_file
# Step 3: Create index
print("Creating index...")
index_file = input_file.replace('.warc.gz', '_index.jsonl')
with open(index_file, 'w') as output:
indexer = Indexer(
fields=['offset', 'length', 'url', 'mime', 'status'],
inputs=[processed_file],
output=output,
verify_http=True
)
indexer.process_all()
print(f"✓ Index created: {index_file}")
# Cleanup temporary file if created
if processed_file != input_file:
os.unlink(processed_file)
return index_file
# Process an archive
index_file = process_archive_pipeline('example.warc.gz')
print(f"Pipeline complete. Index available at: {index_file}")Install with Tessl CLI
npx tessl i tessl/pypi-warcio