or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

archive-reading.mdcli-tools.mdhttp-capture.mdhttp-headers.mdindex.mdstream-processing.mdtime-utilities.mdwarc-writing.md

cli-tools.mddocs/

0

# Command Line Tools

1

2

Built-in command line utilities for indexing, checking, extracting, and recompressing WARC/ARC files. These tools provide essential functionality for web archive management and validation.

3

4

## Capabilities

5

6

### Indexer

7

8

Creates JSON indexes of WARC/ARC files for efficient searching and analysis.

9

10

```python { .api }

11

class Indexer:

12

def __init__(self, fields, inputs, output, verify_http=False):

13

"""

14

Creates JSON indexes of WARC/ARC files.

15

16

Args:

17

fields (list): List of fields to extract for indexing

18

inputs (list): List of input file paths to index

19

output: Output stream for JSON index data

20

verify_http (bool): Whether to verify HTTP headers during indexing

21

"""

22

23

def process_all(self):

24

"""

25

Process all input files and generate complete index.

26

27

Iterates through all input files and creates JSON index entries

28

for each record based on specified fields.

29

"""

30

31

def process_one(self, input_, output, filename):

32

"""

33

Process a single input file.

34

35

Args:

36

input_: Input file stream or path

37

output: Output stream for index data

38

filename (str): Name of the file being processed

39

"""

40

41

def get_field(self, record, name, it, filename):

42

"""

43

Extract field value from a record.

44

45

Args:

46

record: WARC/ARC record to extract from

47

name (str): Field name to extract

48

it: Iterator context

49

filename (str): Source filename

50

51

Returns:

52

Field value for the specified name

53

"""

54

```

55

56

### Checker

57

58

Verifies WARC file integrity and digest validation for quality assurance.

59

60

```python { .api }

61

class Checker:

62

def __init__(self, cmd):

63

"""

64

Verifies WARC file integrity and digests.

65

66

Args:

67

cmd: Command configuration object with checking parameters

68

"""

69

70

def process_all(self):

71

"""

72

Check all configured input files.

73

74

Performs integrity checking on all files specified in the

75

command configuration, validating digests and structure.

76

"""

77

78

def process_one(self, filename):

79

"""

80

Check integrity of a single file.

81

82

Args:

83

filename (str): Path to WARC/ARC file to check

84

85

Validates file structure, record headers, and digest values

86

if present in the records.

87

"""

88

```

89

90

### Extractor

91

92

Extracts specific records from WARC/ARC files based on offset positions.

93

94

```python { .api }

95

class Extractor:

96

def __init__(self, filename, offset):

97

"""

98

Extracts specific records from WARC/ARC files.

99

100

Args:

101

filename (str): Path to WARC/ARC file

102

offset (int): Byte offset of record to extract

103

"""

104

105

def extract(self, payload_only, headers_only):

106

"""

107

Extract and output the record at specified offset.

108

109

Args:

110

payload_only (bool): Extract only the payload content

111

headers_only (bool): Extract only the headers

112

113

Outputs the extracted content to stdout or configured output.

114

"""

115

```

116

117

### Recompressor

118

119

Fixes compression issues in WARC/ARC files by recompressing with proper chunking.

120

121

```python { .api }

122

class Recompressor:

123

def __init__(self, filename, output, verbose=False):

124

"""

125

Fixes compression issues in WARC/ARC files.

126

127

Args:

128

filename (str): Path to input WARC/ARC file

129

output (str): Path for output file

130

verbose (bool): Enable verbose output during processing

131

"""

132

133

def recompress(self):

134

"""

135

Recompress the file with proper gzip member boundaries.

136

137

Fixes issues where gzip files contain multiple records in a

138

single member, which prevents proper seeking and random access.

139

Each record is compressed into its own gzip member.

140

"""

141

```

142

143

### CLI Main Functions

144

145

Entry points and version utilities for command line interface.

146

147

```python { .api }

148

def main(args=None):

149

"""

150

Main CLI entry point for warcio command.

151

152

Args:

153

args (list): Command line arguments (uses sys.argv if None)

154

155

Parses command line arguments and dispatches to appropriate

156

subcommand (index, check, extract, recompress).

157

"""

158

159

def get_version():

160

"""

161

Get warcio package version.

162

163

Returns:

164

str: Current version of warcio package

165

"""

166

```

167

168

## Usage Examples

169

170

### Using Indexer Programmatically

171

172

```python

173

from warcio.indexer import Indexer

174

import sys

175

176

# Define fields to extract for index

177

fields = ['offset', 'length', 'url', 'mime', 'status', 'digest']

178

179

# Create indexer

180

input_files = ['example.warc.gz', 'another.warc.gz']

181

indexer = Indexer(

182

fields=fields,

183

inputs=input_files,

184

output=sys.stdout,

185

verify_http=True

186

)

187

188

# Generate index

189

indexer.process_all()

190

191

# Output will be JSON lines format:

192

# {"offset": 0, "length": 1234, "url": "http://example.com", ...}

193

# {"offset": 1234, "length": 5678, "url": "http://example.org", ...}

194

```

195

196

### Using Checker Programmatically

197

198

```python

199

from warcio.checker import Checker

200

201

# Create command-like object for checker

202

class CheckCommand:

203

def __init__(self, files):

204

self.files = files

205

self.verbose = True

206

207

cmd = CheckCommand(['test.warc.gz', 'another.warc.gz'])

208

checker = Checker(cmd)

209

210

# Check all files

211

try:

212

checker.process_all()

213

print("All files passed integrity checks")

214

except Exception as e:

215

print(f"Integrity check failed: {e}")

216

217

# Check single file

218

try:

219

checker.process_one('specific.warc.gz')

220

print("File integrity verified")

221

except Exception as e:

222

print(f"File has integrity issues: {e}")

223

```

224

225

### Using Extractor Programmatically

226

227

```python

228

from warcio.extractor import Extractor

229

230

# Extract record at specific offset

231

extractor = Extractor(filename='example.warc.gz', offset=1234)

232

233

# Extract complete record (headers + payload)

234

print("=== Complete Record ===")

235

extractor.extract(payload_only=False, headers_only=False)

236

237

# Extract only payload

238

print("\n=== Payload Only ===")

239

extractor.extract(payload_only=True, headers_only=False)

240

241

# Extract only headers

242

print("\n=== Headers Only ===")

243

extractor.extract(payload_only=False, headers_only=True)

244

```

245

246

### Using Recompressor Programmatically

247

248

```python

249

from warcio.recompressor import Recompressor

250

251

# Fix compression issues in a WARC file

252

recompressor = Recompressor(

253

filename='problematic.warc.gz',

254

output='fixed.warc.gz',

255

verbose=True

256

)

257

258

try:

259

recompressor.recompress()

260

print("Successfully recompressed file")

261

print("Each record is now in its own gzip member for proper seeking")

262

except Exception as e:

263

print(f"Recompression failed: {e}")

264

```

265

266

### Command Line Usage

267

268

The tools are primarily designed for command line use via the `warcio` command:

269

270

```bash

271

# Index a WARC file

272

warcio index --fields url,mime,status example.warc.gz

273

274

# Check file integrity

275

warcio check example.warc.gz

276

277

# Extract record at specific offset

278

warcio extract example.warc.gz 1234

279

280

# Extract only payload

281

warcio extract --payload-only example.warc.gz 1234

282

283

# Extract only headers

284

warcio extract --headers-only example.warc.gz 1234

285

286

# Recompress to fix gzip issues

287

warcio recompress problematic.warc.gz fixed.warc.gz

288

289

# Get version

290

warcio --version

291

```

292

293

### Batch Processing with Tools

294

295

```python

296

from warcio.indexer import Indexer

297

from warcio.checker import Checker

298

import glob

299

import json

300

import sys

301

302

# Process all WARC files in directory

303

warc_files = glob.glob('*.warc.gz')

304

305

# Create comprehensive index

306

print("Creating index...")

307

indexer = Indexer(

308

fields=['offset', 'length', 'url', 'mime', 'status', 'digest', 'date'],

309

inputs=warc_files,

310

output=open('complete_index.jsonl', 'w'),

311

verify_http=True

312

)

313

indexer.process_all()

314

315

# Verify all files

316

print("Checking file integrity...")

317

class BatchCheckCommand:

318

def __init__(self, files):

319

self.files = files

320

self.verbose = False

321

322

checker = Checker(BatchCheckCommand(warc_files))

323

324

failed_files = []

325

for filename in warc_files:

326

try:

327

checker.process_one(filename)

328

print(f"✓ {filename}")

329

except Exception as e:

330

print(f"✗ {filename}: {e}")

331

failed_files.append(filename)

332

333

print(f"\nSummary: {len(warc_files) - len(failed_files)}/{len(warc_files)} files passed")

334

if failed_files:

335

print(f"Failed files: {failed_files}")

336

```

337

338

### Custom Field Extraction

339

340

```python

341

from warcio.indexer import Indexer

342

import sys

343

344

class CustomIndexer(Indexer):

345

def get_field(self, record, name, it, filename):

346

"""Override to add custom field extraction."""

347

348

# Standard fields

349

if name == 'url':

350

return record.rec_headers.get_header('WARC-Target-URI')

351

elif name == 'type':

352

return record.rec_type

353

elif name == 'date':

354

return record.rec_headers.get_header('WARC-Date')

355

elif name == 'filename':

356

return filename

357

358

# Custom fields

359

elif name == 'has_http_headers':

360

return bool(record.http_headers)

361

elif name == 'content_length':

362

if record.http_headers:

363

return record.http_headers.get_header('Content-Length')

364

return None

365

elif name == 'server':

366

if record.http_headers:

367

return record.http_headers.get_header('Server')

368

return None

369

370

# Fallback to parent implementation

371

return super().get_field(record, name, it, filename)

372

373

# Use custom indexer

374

custom_fields = ['url', 'type', 'date', 'has_http_headers', 'content_length', 'server']

375

indexer = CustomIndexer(

376

fields=custom_fields,

377

inputs=['example.warc.gz'],

378

output=sys.stdout,

379

verify_http=True

380

)

381

indexer.process_all()

382

```

383

384

### Integration with Archive Processing

385

386

```python

387

from warcio.checker import Checker

388

from warcio.recompressor import Recompressor

389

from warcio.indexer import Indexer

390

import os

391

import tempfile

392

393

def process_archive_pipeline(input_file):

394

"""Complete pipeline for processing a WARC archive."""

395

396

# Step 1: Check integrity

397

print(f"Checking {input_file}...")

398

class SimpleCommand:

399

def __init__(self, files):

400

self.files = files

401

402

checker = Checker(SimpleCommand([input_file]))

403

404

try:

405

checker.process_one(input_file)

406

print("✓ Integrity check passed")

407

processed_file = input_file

408

except Exception as e:

409

print(f"⚠ Integrity issues detected: {e}")

410

411

# Step 2: Try to fix with recompression

412

print("Attempting to fix with recompression...")

413

temp_file = tempfile.mktemp(suffix='.warc.gz')

414

415

recompressor = Recompressor(input_file, temp_file, verbose=True)

416

recompressor.recompress()

417

418

# Verify the fixed file

419

checker.process_one(temp_file)

420

print("✓ File fixed and verified")

421

processed_file = temp_file

422

423

# Step 3: Create index

424

print("Creating index...")

425

index_file = input_file.replace('.warc.gz', '_index.jsonl')

426

427

with open(index_file, 'w') as output:

428

indexer = Indexer(

429

fields=['offset', 'length', 'url', 'mime', 'status'],

430

inputs=[processed_file],

431

output=output,

432

verify_http=True

433

)

434

indexer.process_all()

435

436

print(f"✓ Index created: {index_file}")

437

438

# Cleanup temporary file if created

439

if processed_file != input_file:

440

os.unlink(processed_file)

441

442

return index_file

443

444

# Process an archive

445

index_file = process_archive_pipeline('example.warc.gz')

446

print(f"Pipeline complete. Index available at: {index_file}")

447

```