0
# Command Line Tools
1
2
Built-in command line utilities for indexing, checking, extracting, and recompressing WARC/ARC files. These tools provide essential functionality for web archive management and validation.
3
4
## Capabilities
5
6
### Indexer
7
8
Creates JSON indexes of WARC/ARC files for efficient searching and analysis.
9
10
```python { .api }
11
class Indexer:
12
def __init__(self, fields, inputs, output, verify_http=False):
13
"""
14
Creates JSON indexes of WARC/ARC files.
15
16
Args:
17
fields (list): List of fields to extract for indexing
18
inputs (list): List of input file paths to index
19
output: Output stream for JSON index data
20
verify_http (bool): Whether to verify HTTP headers during indexing
21
"""
22
23
def process_all(self):
24
"""
25
Process all input files and generate complete index.
26
27
Iterates through all input files and creates JSON index entries
28
for each record based on specified fields.
29
"""
30
31
def process_one(self, input_, output, filename):
32
"""
33
Process a single input file.
34
35
Args:
36
input_: Input file stream or path
37
output: Output stream for index data
38
filename (str): Name of the file being processed
39
"""
40
41
def get_field(self, record, name, it, filename):
42
"""
43
Extract field value from a record.
44
45
Args:
46
record: WARC/ARC record to extract from
47
name (str): Field name to extract
48
it: Iterator context
49
filename (str): Source filename
50
51
Returns:
52
Field value for the specified name
53
"""
54
```
55
56
### Checker
57
58
Verifies WARC file integrity and digest validation for quality assurance.
59
60
```python { .api }
61
class Checker:
62
def __init__(self, cmd):
63
"""
64
Verifies WARC file integrity and digests.
65
66
Args:
67
cmd: Command configuration object with checking parameters
68
"""
69
70
def process_all(self):
71
"""
72
Check all configured input files.
73
74
Performs integrity checking on all files specified in the
75
command configuration, validating digests and structure.
76
"""
77
78
def process_one(self, filename):
79
"""
80
Check integrity of a single file.
81
82
Args:
83
filename (str): Path to WARC/ARC file to check
84
85
Validates file structure, record headers, and digest values
86
if present in the records.
87
"""
88
```
89
90
### Extractor
91
92
Extracts specific records from WARC/ARC files based on offset positions.
93
94
```python { .api }
95
class Extractor:
96
def __init__(self, filename, offset):
97
"""
98
Extracts specific records from WARC/ARC files.
99
100
Args:
101
filename (str): Path to WARC/ARC file
102
offset (int): Byte offset of record to extract
103
"""
104
105
def extract(self, payload_only, headers_only):
106
"""
107
Extract and output the record at specified offset.
108
109
Args:
110
payload_only (bool): Extract only the payload content
111
headers_only (bool): Extract only the headers
112
113
Outputs the extracted content to stdout or configured output.
114
"""
115
```
116
117
### Recompressor
118
119
Fixes compression issues in WARC/ARC files by recompressing with proper chunking.
120
121
```python { .api }
122
class Recompressor:
123
def __init__(self, filename, output, verbose=False):
124
"""
125
Fixes compression issues in WARC/ARC files.
126
127
Args:
128
filename (str): Path to input WARC/ARC file
129
output (str): Path for output file
130
verbose (bool): Enable verbose output during processing
131
"""
132
133
def recompress(self):
134
"""
135
Recompress the file with proper gzip member boundaries.
136
137
Fixes issues where gzip files contain multiple records in a
138
single member, which prevents proper seeking and random access.
139
Each record is compressed into its own gzip member.
140
"""
141
```
142
143
### CLI Main Functions
144
145
Entry points and version utilities for command line interface.
146
147
```python { .api }
148
def main(args=None):
149
"""
150
Main CLI entry point for warcio command.
151
152
Args:
153
args (list): Command line arguments (uses sys.argv if None)
154
155
Parses command line arguments and dispatches to appropriate
156
subcommand (index, check, extract, recompress).
157
"""
158
159
def get_version():
160
"""
161
Get warcio package version.
162
163
Returns:
164
str: Current version of warcio package
165
"""
166
```
167
168
## Usage Examples
169
170
### Using Indexer Programmatically
171
172
```python
173
from warcio.indexer import Indexer
174
import sys
175
176
# Define fields to extract for index
177
fields = ['offset', 'length', 'url', 'mime', 'status', 'digest']
178
179
# Create indexer
180
input_files = ['example.warc.gz', 'another.warc.gz']
181
indexer = Indexer(
182
fields=fields,
183
inputs=input_files,
184
output=sys.stdout,
185
verify_http=True
186
)
187
188
# Generate index
189
indexer.process_all()
190
191
# Output will be JSON lines format:
192
# {"offset": 0, "length": 1234, "url": "http://example.com", ...}
193
# {"offset": 1234, "length": 5678, "url": "http://example.org", ...}
194
```
195
196
### Using Checker Programmatically
197
198
```python
199
from warcio.checker import Checker
200
201
# Create command-like object for checker
202
class CheckCommand:
203
def __init__(self, files):
204
self.files = files
205
self.verbose = True
206
207
cmd = CheckCommand(['test.warc.gz', 'another.warc.gz'])
208
checker = Checker(cmd)
209
210
# Check all files
211
try:
212
checker.process_all()
213
print("All files passed integrity checks")
214
except Exception as e:
215
print(f"Integrity check failed: {e}")
216
217
# Check single file
218
try:
219
checker.process_one('specific.warc.gz')
220
print("File integrity verified")
221
except Exception as e:
222
print(f"File has integrity issues: {e}")
223
```
224
225
### Using Extractor Programmatically
226
227
```python
228
from warcio.extractor import Extractor
229
230
# Extract record at specific offset
231
extractor = Extractor(filename='example.warc.gz', offset=1234)
232
233
# Extract complete record (headers + payload)
234
print("=== Complete Record ===")
235
extractor.extract(payload_only=False, headers_only=False)
236
237
# Extract only payload
238
print("\n=== Payload Only ===")
239
extractor.extract(payload_only=True, headers_only=False)
240
241
# Extract only headers
242
print("\n=== Headers Only ===")
243
extractor.extract(payload_only=False, headers_only=True)
244
```
245
246
### Using Recompressor Programmatically
247
248
```python
249
from warcio.recompressor import Recompressor
250
251
# Fix compression issues in a WARC file
252
recompressor = Recompressor(
253
filename='problematic.warc.gz',
254
output='fixed.warc.gz',
255
verbose=True
256
)
257
258
try:
259
recompressor.recompress()
260
print("Successfully recompressed file")
261
print("Each record is now in its own gzip member for proper seeking")
262
except Exception as e:
263
print(f"Recompression failed: {e}")
264
```
265
266
### Command Line Usage
267
268
The tools are primarily designed for command line use via the `warcio` command:
269
270
```bash
271
# Index a WARC file
272
warcio index --fields url,mime,status example.warc.gz
273
274
# Check file integrity
275
warcio check example.warc.gz
276
277
# Extract record at specific offset
278
warcio extract example.warc.gz 1234
279
280
# Extract only payload
281
warcio extract --payload-only example.warc.gz 1234
282
283
# Extract only headers
284
warcio extract --headers-only example.warc.gz 1234
285
286
# Recompress to fix gzip issues
287
warcio recompress problematic.warc.gz fixed.warc.gz
288
289
# Get version
290
warcio --version
291
```
292
293
### Batch Processing with Tools
294
295
```python
296
from warcio.indexer import Indexer
297
from warcio.checker import Checker
298
import glob
299
import json
300
import sys
301
302
# Process all WARC files in directory
303
warc_files = glob.glob('*.warc.gz')
304
305
# Create comprehensive index
306
print("Creating index...")
307
indexer = Indexer(
308
fields=['offset', 'length', 'url', 'mime', 'status', 'digest', 'date'],
309
inputs=warc_files,
310
output=open('complete_index.jsonl', 'w'),
311
verify_http=True
312
)
313
indexer.process_all()
314
315
# Verify all files
316
print("Checking file integrity...")
317
class BatchCheckCommand:
318
def __init__(self, files):
319
self.files = files
320
self.verbose = False
321
322
checker = Checker(BatchCheckCommand(warc_files))
323
324
failed_files = []
325
for filename in warc_files:
326
try:
327
checker.process_one(filename)
328
print(f"✓ {filename}")
329
except Exception as e:
330
print(f"✗ {filename}: {e}")
331
failed_files.append(filename)
332
333
print(f"\nSummary: {len(warc_files) - len(failed_files)}/{len(warc_files)} files passed")
334
if failed_files:
335
print(f"Failed files: {failed_files}")
336
```
337
338
### Custom Field Extraction
339
340
```python
341
from warcio.indexer import Indexer
342
import sys
343
344
class CustomIndexer(Indexer):
345
def get_field(self, record, name, it, filename):
346
"""Override to add custom field extraction."""
347
348
# Standard fields
349
if name == 'url':
350
return record.rec_headers.get_header('WARC-Target-URI')
351
elif name == 'type':
352
return record.rec_type
353
elif name == 'date':
354
return record.rec_headers.get_header('WARC-Date')
355
elif name == 'filename':
356
return filename
357
358
# Custom fields
359
elif name == 'has_http_headers':
360
return bool(record.http_headers)
361
elif name == 'content_length':
362
if record.http_headers:
363
return record.http_headers.get_header('Content-Length')
364
return None
365
elif name == 'server':
366
if record.http_headers:
367
return record.http_headers.get_header('Server')
368
return None
369
370
# Fallback to parent implementation
371
return super().get_field(record, name, it, filename)
372
373
# Use custom indexer
374
custom_fields = ['url', 'type', 'date', 'has_http_headers', 'content_length', 'server']
375
indexer = CustomIndexer(
376
fields=custom_fields,
377
inputs=['example.warc.gz'],
378
output=sys.stdout,
379
verify_http=True
380
)
381
indexer.process_all()
382
```
383
384
### Integration with Archive Processing
385
386
```python
387
from warcio.checker import Checker
388
from warcio.recompressor import Recompressor
389
from warcio.indexer import Indexer
390
import os
391
import tempfile
392
393
def process_archive_pipeline(input_file):
394
"""Complete pipeline for processing a WARC archive."""
395
396
# Step 1: Check integrity
397
print(f"Checking {input_file}...")
398
class SimpleCommand:
399
def __init__(self, files):
400
self.files = files
401
402
checker = Checker(SimpleCommand([input_file]))
403
404
try:
405
checker.process_one(input_file)
406
print("✓ Integrity check passed")
407
processed_file = input_file
408
except Exception as e:
409
print(f"⚠ Integrity issues detected: {e}")
410
411
# Step 2: Try to fix with recompression
412
print("Attempting to fix with recompression...")
413
temp_file = tempfile.mktemp(suffix='.warc.gz')
414
415
recompressor = Recompressor(input_file, temp_file, verbose=True)
416
recompressor.recompress()
417
418
# Verify the fixed file
419
checker.process_one(temp_file)
420
print("✓ File fixed and verified")
421
processed_file = temp_file
422
423
# Step 3: Create index
424
print("Creating index...")
425
index_file = input_file.replace('.warc.gz', '_index.jsonl')
426
427
with open(index_file, 'w') as output:
428
indexer = Indexer(
429
fields=['offset', 'length', 'url', 'mime', 'status'],
430
inputs=[processed_file],
431
output=output,
432
verify_http=True
433
)
434
indexer.process_all()
435
436
print(f"✓ Index created: {index_file}")
437
438
# Cleanup temporary file if created
439
if processed_file != input_file:
440
os.unlink(processed_file)
441
442
return index_file
443
444
# Process an archive
445
index_file = process_archive_pipeline('example.warc.gz')
446
print(f"Pipeline complete. Index available at: {index_file}")
447
```