CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-warcio

Streaming WARC (and ARC) IO library for reading and writing web archive files

Overview
Eval results
Files

cli-tools.mddocs/

Command Line Tools

Built-in command line utilities for indexing, checking, extracting, and recompressing WARC/ARC files. These tools provide essential functionality for web archive management and validation.

Capabilities

Indexer

Creates JSON indexes of WARC/ARC files for efficient searching and analysis.

class Indexer:
    def __init__(self, fields, inputs, output, verify_http=False):
        """
        Creates JSON indexes of WARC/ARC files.
        
        Args:
            fields (list): List of fields to extract for indexing
            inputs (list): List of input file paths to index
            output: Output stream for JSON index data
            verify_http (bool): Whether to verify HTTP headers during indexing
        """
    
    def process_all(self):
        """
        Process all input files and generate complete index.
        
        Iterates through all input files and creates JSON index entries
        for each record based on specified fields.
        """
    
    def process_one(self, input_, output, filename):
        """
        Process a single input file.
        
        Args:
            input_: Input file stream or path
            output: Output stream for index data
            filename (str): Name of the file being processed
        """
    
    def get_field(self, record, name, it, filename):
        """
        Extract field value from a record.
        
        Args:
            record: WARC/ARC record to extract from
            name (str): Field name to extract
            it: Iterator context
            filename (str): Source filename
            
        Returns:
            Field value for the specified name
        """

Checker

Verifies WARC file integrity and digest validation for quality assurance.

class Checker:
    def __init__(self, cmd):
        """
        Verifies WARC file integrity and digests.
        
        Args:
            cmd: Command configuration object with checking parameters
        """
    
    def process_all(self):
        """
        Check all configured input files.
        
        Performs integrity checking on all files specified in the
        command configuration, validating digests and structure.
        """
    
    def process_one(self, filename):
        """
        Check integrity of a single file.
        
        Args:
            filename (str): Path to WARC/ARC file to check
            
        Validates file structure, record headers, and digest values
        if present in the records.
        """

Extractor

Extracts specific records from WARC/ARC files based on offset positions.

class Extractor:
    def __init__(self, filename, offset):
        """
        Extracts specific records from WARC/ARC files.
        
        Args:
            filename (str): Path to WARC/ARC file
            offset (int): Byte offset of record to extract
        """
    
    def extract(self, payload_only, headers_only):
        """
        Extract and output the record at specified offset.
        
        Args:
            payload_only (bool): Extract only the payload content
            headers_only (bool): Extract only the headers
            
        Outputs the extracted content to stdout or configured output.
        """

Recompressor

Fixes compression issues in WARC/ARC files by recompressing with proper chunking.

class Recompressor:
    def __init__(self, filename, output, verbose=False):
        """
        Fixes compression issues in WARC/ARC files.
        
        Args:
            filename (str): Path to input WARC/ARC file
            output (str): Path for output file  
            verbose (bool): Enable verbose output during processing
        """
    
    def recompress(self):
        """
        Recompress the file with proper gzip member boundaries.
        
        Fixes issues where gzip files contain multiple records in a
        single member, which prevents proper seeking and random access.
        Each record is compressed into its own gzip member.
        """

CLI Main Functions

Entry points and version utilities for command line interface.

def main(args=None):
    """
    Main CLI entry point for warcio command.
    
    Args:
        args (list): Command line arguments (uses sys.argv if None)
        
    Parses command line arguments and dispatches to appropriate
    subcommand (index, check, extract, recompress).
    """

def get_version():
    """
    Get warcio package version.
    
    Returns:
        str: Current version of warcio package
    """

Usage Examples

Using Indexer Programmatically

from warcio.indexer import Indexer
import sys

# Define fields to extract for index
fields = ['offset', 'length', 'url', 'mime', 'status', 'digest']

# Create indexer
input_files = ['example.warc.gz', 'another.warc.gz']
indexer = Indexer(
    fields=fields,
    inputs=input_files,
    output=sys.stdout,
    verify_http=True
)

# Generate index
indexer.process_all()

# Output will be JSON lines format:
# {"offset": 0, "length": 1234, "url": "http://example.com", ...}
# {"offset": 1234, "length": 5678, "url": "http://example.org", ...}

Using Checker Programmatically

from warcio.checker import Checker

# Create command-like object for checker
class CheckCommand:
    def __init__(self, files):
        self.files = files
        self.verbose = True

cmd = CheckCommand(['test.warc.gz', 'another.warc.gz'])
checker = Checker(cmd)

# Check all files
try:
    checker.process_all()
    print("All files passed integrity checks")
except Exception as e:
    print(f"Integrity check failed: {e}")

# Check single file
try:
    checker.process_one('specific.warc.gz')
    print("File integrity verified")
except Exception as e:
    print(f"File has integrity issues: {e}")

Using Extractor Programmatically

from warcio.extractor import Extractor

# Extract record at specific offset
extractor = Extractor(filename='example.warc.gz', offset=1234)

# Extract complete record (headers + payload)
print("=== Complete Record ===")
extractor.extract(payload_only=False, headers_only=False)

# Extract only payload
print("\n=== Payload Only ===")
extractor.extract(payload_only=True, headers_only=False)

# Extract only headers  
print("\n=== Headers Only ===")
extractor.extract(payload_only=False, headers_only=True)

Using Recompressor Programmatically

from warcio.recompressor import Recompressor

# Fix compression issues in a WARC file
recompressor = Recompressor(
    filename='problematic.warc.gz',
    output='fixed.warc.gz', 
    verbose=True
)

try:
    recompressor.recompress()
    print("Successfully recompressed file")
    print("Each record is now in its own gzip member for proper seeking")
except Exception as e:
    print(f"Recompression failed: {e}")

Command Line Usage

The tools are primarily designed for command line use via the warcio command:

# Index a WARC file
warcio index --fields url,mime,status example.warc.gz

# Check file integrity
warcio check example.warc.gz

# Extract record at specific offset
warcio extract example.warc.gz 1234

# Extract only payload
warcio extract --payload-only example.warc.gz 1234

# Extract only headers
warcio extract --headers-only example.warc.gz 1234

# Recompress to fix gzip issues
warcio recompress problematic.warc.gz fixed.warc.gz

# Get version
warcio --version

Batch Processing with Tools

from warcio.indexer import Indexer
from warcio.checker import Checker
import glob
import json
import sys

# Process all WARC files in directory
warc_files = glob.glob('*.warc.gz')

# Create comprehensive index
print("Creating index...")
indexer = Indexer(
    fields=['offset', 'length', 'url', 'mime', 'status', 'digest', 'date'],
    inputs=warc_files,
    output=open('complete_index.jsonl', 'w'),
    verify_http=True
)
indexer.process_all()

# Verify all files
print("Checking file integrity...")
class BatchCheckCommand:
    def __init__(self, files):
        self.files = files
        self.verbose = False

checker = Checker(BatchCheckCommand(warc_files))

failed_files = []
for filename in warc_files:
    try:
        checker.process_one(filename)
        print(f"✓ {filename}")
    except Exception as e:
        print(f"✗ {filename}: {e}")
        failed_files.append(filename)

print(f"\nSummary: {len(warc_files) - len(failed_files)}/{len(warc_files)} files passed")
if failed_files:
    print(f"Failed files: {failed_files}")

Custom Field Extraction

from warcio.indexer import Indexer
import sys

class CustomIndexer(Indexer):
    def get_field(self, record, name, it, filename):
        """Override to add custom field extraction."""
        
        # Standard fields
        if name == 'url':
            return record.rec_headers.get_header('WARC-Target-URI')
        elif name == 'type':
            return record.rec_type
        elif name == 'date':
            return record.rec_headers.get_header('WARC-Date')
        elif name == 'filename':
            return filename
            
        # Custom fields
        elif name == 'has_http_headers':
            return bool(record.http_headers)
        elif name == 'content_length':
            if record.http_headers:
                return record.http_headers.get_header('Content-Length')
            return None
        elif name == 'server':
            if record.http_headers:
                return record.http_headers.get_header('Server')
            return None
            
        # Fallback to parent implementation
        return super().get_field(record, name, it, filename)

# Use custom indexer
custom_fields = ['url', 'type', 'date', 'has_http_headers', 'content_length', 'server']
indexer = CustomIndexer(
    fields=custom_fields,
    inputs=['example.warc.gz'],
    output=sys.stdout,
    verify_http=True
)
indexer.process_all()

Integration with Archive Processing

from warcio.checker import Checker
from warcio.recompressor import Recompressor
from warcio.indexer import Indexer
import os
import tempfile

def process_archive_pipeline(input_file):
    """Complete pipeline for processing a WARC archive."""
    
    # Step 1: Check integrity
    print(f"Checking {input_file}...")
    class SimpleCommand:
        def __init__(self, files):
            self.files = files
    
    checker = Checker(SimpleCommand([input_file]))
    
    try:
        checker.process_one(input_file)
        print("✓ Integrity check passed")
        processed_file = input_file
    except Exception as e:
        print(f"⚠ Integrity issues detected: {e}")
        
        # Step 2: Try to fix with recompression
        print("Attempting to fix with recompression...")
        temp_file = tempfile.mktemp(suffix='.warc.gz')
        
        recompressor = Recompressor(input_file, temp_file, verbose=True)
        recompressor.recompress()
        
        # Verify the fixed file
        checker.process_one(temp_file)
        print("✓ File fixed and verified")
        processed_file = temp_file
    
    # Step 3: Create index
    print("Creating index...")
    index_file = input_file.replace('.warc.gz', '_index.jsonl')
    
    with open(index_file, 'w') as output:
        indexer = Indexer(
            fields=['offset', 'length', 'url', 'mime', 'status'],
            inputs=[processed_file],
            output=output,
            verify_http=True
        )
        indexer.process_all()
    
    print(f"✓ Index created: {index_file}")
    
    # Cleanup temporary file if created
    if processed_file != input_file:
        os.unlink(processed_file)
    
    return index_file

# Process an archive
index_file = process_archive_pipeline('example.warc.gz')
print(f"Pipeline complete. Index available at: {index_file}")

Install with Tessl CLI

npx tessl i tessl/pypi-warcio

docs

archive-reading.md

cli-tools.md

http-capture.md

http-headers.md

index.md

stream-processing.md

time-utilities.md

warc-writing.md

tile.json