or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

archive-reading.mdcli-tools.mdhttp-capture.mdhttp-headers.mdindex.mdstream-processing.mdtime-utilities.mdwarc-writing.md

stream-processing.mddocs/

0

# Stream Processing

1

2

Advanced stream processing capabilities with compression, digest verification, and buffered reading for efficient handling of large archive files and streaming data sources.

3

4

## Capabilities

5

6

### Buffered Reader

7

8

Core buffered reader with optional decompression support for efficient stream processing.

9

10

```python { .api }

11

class BufferedReader:

12

def __init__(self, stream, block_size=16384, decomp_type=None,

13

starting_data=None, read_all_members=False):

14

"""

15

Buffered reader with optional decompression.

16

17

Args:

18

stream: Source stream to read from

19

block_size (int): Buffer size in bytes (default 16384)

20

decomp_type (str): Decompression type ('gzip', 'deflate', 'brotli', or None)

21

starting_data (bytes): Pre-read data to include in buffer

22

read_all_members (bool): Whether to read all compression members

23

"""

24

25

def read(self, length=None):

26

"""

27

Read data with buffering and optional decompression.

28

29

Args:

30

length (int): Number of bytes to read (None for all available)

31

32

Returns:

33

bytes: Read data

34

"""

35

36

def readline(self, length=None):

37

"""

38

Read a line with buffering and optional decompression.

39

40

Args:

41

length (int): Maximum line length (None for unlimited)

42

43

Returns:

44

bytes: Line data including line ending

45

"""

46

47

def tell(self):

48

"""

49

Get current position in buffer.

50

51

Returns:

52

int: Current buffer position

53

"""

54

55

def empty(self):

56

"""

57

Check if buffer is empty.

58

59

Returns:

60

bool: True if buffer has no more data

61

"""

62

63

def read_next_member(self):

64

"""

65

Move to next compression member (for multi-member files).

66

67

Returns:

68

bool: True if next member found, False if at end

69

"""

70

71

def rem_length(self):

72

"""

73

Get remaining buffer length.

74

75

Returns:

76

int: Number of bytes remaining in buffer

77

"""

78

79

def close(self):

80

"""Close the reader and cleanup resources."""

81

82

def set_decomp(self, decomp_type):

83

"""

84

Set or change decompression type.

85

86

Args:

87

decomp_type (str): Decompression type ('gzip', 'deflate', 'brotli', or None)

88

"""

89

90

@classmethod

91

def get_supported_decompressors(cls):

92

"""

93

Get list of supported decompression types.

94

95

Returns:

96

list: Available decompression types

97

"""

98

```

99

100

### Decompressing Buffered Reader

101

102

Specialized buffered reader that defaults to gzip decompression.

103

104

```python { .api }

105

class DecompressingBufferedReader(BufferedReader):

106

def __init__(self, stream, **kwargs):

107

"""

108

BufferedReader that defaults to gzip decompression.

109

110

Args:

111

stream: Source stream to read from

112

**kwargs: Additional arguments passed to BufferedReader

113

"""

114

```

115

116

### Chunked Data Reader

117

118

Buffered reader with HTTP chunked encoding support for handling chunked transfer encoding.

119

120

```python { .api }

121

class ChunkedDataReader(BufferedReader):

122

def __init__(self, stream, raise_exceptions=False, **kwargs):

123

"""

124

BufferedReader with HTTP chunked encoding support.

125

126

Args:

127

stream: Source stream with chunked data

128

raise_exceptions (bool): Whether to raise exceptions on chunk errors

129

**kwargs: Additional arguments passed to BufferedReader

130

"""

131

132

class ChunkedDataException(Exception):

133

def __init__(self, msg, data=b''):

134

"""

135

Exception for chunked data parsing errors.

136

137

Args:

138

msg (str): Error message

139

data (bytes): Problematic data chunk

140

"""

141

```

142

143

### Limit Reader

144

145

Reader that enforces byte limits for controlled reading of stream portions.

146

147

```python { .api }

148

class LimitReader:

149

def __init__(self, stream, limit):

150

"""

151

Reader that limits reading to specified byte count.

152

153

Args:

154

stream: Source stream to read from

155

limit (int): Maximum number of bytes to read

156

"""

157

158

def read(self, length=None):

159

"""

160

Read data with limit enforcement.

161

162

Args:

163

length (int): Number of bytes to read (limited by remaining quota)

164

165

Returns:

166

bytes: Read data (may be less than requested due to limit)

167

"""

168

169

def readline(self, length=None):

170

"""

171

Read line with limit enforcement.

172

173

Args:

174

length (int): Maximum line length

175

176

Returns:

177

bytes: Line data (may be truncated due to limit)

178

"""

179

180

def tell(self):

181

"""

182

Get position within limited stream.

183

184

Returns:

185

int: Number of bytes read so far

186

"""

187

188

def close(self):

189

"""Close underlying stream."""

190

191

@staticmethod

192

def wrap_stream(stream, content_length):

193

"""

194

Wrap stream with LimitReader if content_length is specified.

195

196

Args:

197

stream: Stream to potentially wrap

198

content_length (int or None): Content length limit

199

200

Returns:

201

Stream or LimitReader: Original stream or wrapped with limit

202

"""

203

```

204

205

### Digest Verifying Reader

206

207

Reader that verifies digests while reading data, extending LimitReader with digest validation.

208

209

```python { .api }

210

class DigestVerifyingReader(LimitReader):

211

def __init__(self, stream, limit, digest_checker, record_type=None,

212

payload_digest=None, block_digest=None, segment_number=None):

213

"""

214

Reader that verifies digests while reading.

215

216

Args:

217

stream: Source stream to read from

218

limit (int): Maximum bytes to read

219

digest_checker: DigestChecker instance for validation

220

record_type (str): Type of record being read

221

payload_digest (str): Expected payload digest

222

block_digest (str): Expected block digest

223

segment_number (int): Segment number for multi-part records

224

"""

225

226

def begin_payload(self):

227

"""Mark beginning of payload for digest calculation."""

228

229

class DigestChecker:

230

def __init__(self, kind=None):

231

"""

232

Tracks digest verification results.

233

234

Args:

235

kind (str): Type of digest checking being performed

236

"""

237

238

@property

239

def passed(self):

240

"""

241

Whether all digests passed verification.

242

243

Returns:

244

bool: True if all digests verified successfully

245

"""

246

247

@property

248

def problems(self):

249

"""

250

List of problems encountered during verification.

251

252

Returns:

253

list: Problem descriptions

254

"""

255

256

def problem(self, value, passed=False):

257

"""

258

Record a verification problem.

259

260

Args:

261

value (str): Description of the problem

262

passed (bool): Whether this should be considered a pass despite the problem

263

"""

264

```

265

266

### Decompression Utilities

267

268

Utility functions for creating and managing decompressors.

269

270

```python { .api }

271

def gzip_decompressor():

272

"""

273

Create a gzip decompressor.

274

275

Returns:

276

Decompressor object for gzip data

277

"""

278

279

def deflate_decompressor():

280

"""

281

Create a deflate decompressor.

282

283

Returns:

284

Decompressor object for deflate data

285

"""

286

287

def deflate_decompressor_alt():

288

"""

289

Create alternative deflate decompressor with different window size.

290

291

Returns:

292

Decompressor object for deflate data (alternative settings)

293

"""

294

295

def try_brotli_init():

296

"""

297

Initialize brotli decompression support if available.

298

299

Returns:

300

bool: True if brotli is available and initialized

301

"""

302

```

303

304

### Hash Digest Utilities

305

306

Utility classes for computing and managing hash digests during stream processing.

307

308

```python { .api }

309

class Digester:

310

def __init__(self, type_='sha1'):

311

"""

312

Hash digest calculator for stream data.

313

314

Args:

315

type_ (str): Hash algorithm type ('sha1', 'md5', 'sha256', etc.)

316

"""

317

318

def update(self, buff):

319

"""

320

Update hash with new data.

321

322

Args:

323

buff (bytes): Data to add to hash calculation

324

"""

325

326

def __str__(self):

327

"""

328

Get final hash digest as string.

329

330

Returns:

331

str: Hash digest in format 'algorithm:hexdigest'

332

"""

333

```

334

335

## Usage Examples

336

337

### Basic Buffered Reading

338

339

```python

340

from warcio.bufferedreaders import BufferedReader

341

import io

342

343

# Create buffered reader for efficient reading

344

data = b"Hello, World!" * 1000

345

stream = io.BytesIO(data)

346

reader = BufferedReader(stream, block_size=4096)

347

348

# Read data in chunks

349

chunk1 = reader.read(100)

350

chunk2 = reader.read(200)

351

352

print(f"Read {len(chunk1)} + {len(chunk2)} bytes")

353

print(f"Buffer position: {reader.tell()}")

354

355

# Read lines

356

stream.seek(0)

357

reader = BufferedReader(stream)

358

line = reader.readline()

359

print(f"First line: {line}")

360

361

reader.close()

362

```

363

364

### Decompression Reading

365

366

```python

367

from warcio.bufferedreaders import DecompressingBufferedReader

368

import gzip

369

import io

370

371

# Create compressed data

372

original_data = b"This is some test data that will be compressed"

373

compressed_data = gzip.compress(original_data)

374

375

# Read with automatic decompression

376

stream = io.BytesIO(compressed_data)

377

reader = DecompressingBufferedReader(stream)

378

379

# Data is automatically decompressed

380

decompressed = reader.read()

381

print(f"Original: {len(original_data)} bytes")

382

print(f"Compressed: {len(compressed_data)} bytes")

383

print(f"Decompressed: {len(decompressed)} bytes")

384

print(f"Match: {decompressed == original_data}")

385

386

reader.close()

387

```

388

389

### Manual Decompression Setup

390

391

```python

392

from warcio.bufferedreaders import BufferedReader

393

import gzip

394

import io

395

396

# Create gzip compressed data

397

original_data = b"Manual decompression example data"

398

compressed_data = gzip.compress(original_data)

399

400

# Set up reader with manual decompression type

401

stream = io.BytesIO(compressed_data)

402

reader = BufferedReader(stream, decomp_type='gzip')

403

404

# Read decompressed data

405

result = reader.read()

406

print(f"Manually decompressed: {result == original_data}")

407

408

# Check supported decompression types

409

supported = BufferedReader.get_supported_decompressors()

410

print(f"Supported decompressors: {supported}")

411

412

reader.close()

413

```

414

415

### Chunked Data Reading

416

417

```python

418

from warcio.bufferedreaders import ChunkedDataReader, ChunkedDataException

419

import io

420

421

# Create HTTP chunked data

422

chunked_data = b"5\r\nHello\r\n6\r\n World\r\n0\r\n\r\n"

423

stream = io.BytesIO(chunked_data)

424

425

try:

426

reader = ChunkedDataReader(stream, raise_exceptions=True)

427

428

# Read dechunked data

429

result = reader.read()

430

print(f"Dechunked data: {result}") # b"Hello World"

431

432

except ChunkedDataException as e:

433

print(f"Chunked data error: {e}")

434

print(f"Problematic data: {e.data}")

435

finally:

436

reader.close()

437

```

438

439

### Limited Reading

440

441

```python

442

from warcio.limitreader import LimitReader

443

import io

444

445

# Create large data stream

446

large_data = b"x" * 10000

447

stream = io.BytesIO(large_data)

448

449

# Limit reading to first 100 bytes

450

limited_reader = LimitReader(stream, limit=100)

451

452

# Read data - will stop at limit

453

data = limited_reader.read()

454

print(f"Read {len(data)} bytes (limited to 100)")

455

print(f"Position: {limited_reader.tell()}")

456

457

# Trying to read more returns empty

458

more_data = limited_reader.read()

459

print(f"Additional read: {len(more_data)} bytes")

460

461

limited_reader.close()

462

```

463

464

### Automatic Stream Wrapping

465

466

```python

467

from warcio.limitreader import LimitReader

468

import io

469

470

# Test automatic wrapping

471

large_stream = io.BytesIO(b"x" * 1000)

472

473

# Wrap with limit if content length specified

474

wrapped = LimitReader.wrap_stream(large_stream, content_length=100)

475

print(f"Wrapped stream type: {type(wrapped).__name__}")

476

477

# No wrapping if no content length

478

unwrapped = LimitReader.wrap_stream(large_stream, content_length=None)

479

print(f"Unwrapped stream type: {type(unwrapped).__name__}")

480

```

481

482

### Digest Verification

483

484

```python

485

from warcio.digestverifyingreader import DigestVerifyingReader, DigestChecker

486

import io

487

import hashlib

488

489

# Create test data and calculate digest

490

test_data = b"This is test data for digest verification"

491

expected_digest = "sha1:" + hashlib.sha1(test_data).hexdigest()

492

493

# Set up digest checker

494

checker = DigestChecker(kind="test")

495

stream = io.BytesIO(test_data)

496

497

# Create verifying reader

498

verifying_reader = DigestVerifyingReader(

499

stream=stream,

500

limit=len(test_data),

501

digest_checker=checker,

502

payload_digest=expected_digest

503

)

504

505

# Begin payload reading (starts digest calculation)

506

verifying_reader.begin_payload()

507

508

# Read data - digest is calculated during reading

509

data = verifying_reader.read()

510

print(f"Read {len(data)} bytes")

511

512

# Check verification results

513

print(f"Digest verification passed: {checker.passed}")

514

if not checker.passed:

515

print(f"Problems: {checker.problems}")

516

517

verifying_reader.close()

518

```

519

520

### Multi-Member Compression

521

522

```python

523

from warcio.bufferedreaders import BufferedReader

524

import gzip

525

import io

526

527

# Create multi-member gzip data

528

data1 = b"First member data"

529

data2 = b"Second member data"

530

compressed1 = gzip.compress(data1)

531

compressed2 = gzip.compress(data2)

532

multi_member_data = compressed1 + compressed2

533

534

# Read multi-member file

535

stream = io.BytesIO(multi_member_data)

536

reader = BufferedReader(stream, decomp_type='gzip', read_all_members=True)

537

538

# Read first member

539

member1 = reader.read()

540

print(f"First member: {member1}")

541

542

# Move to next member

543

if reader.read_next_member():

544

member2 = reader.read()

545

print(f"Second member: {member2}")

546

else:

547

print("No second member found")

548

549

reader.close()

550

```

551

552

### Error Handling and Cleanup

553

554

```python

555

from warcio.bufferedreaders import BufferedReader, ChunkedDataException

556

from warcio.limitreader import LimitReader

557

import io

558

559

stream = io.BytesIO(b"test data")

560

561

try:

562

# Create readers

563

buffered = BufferedReader(stream)

564

limited = LimitReader(buffered, limit=50)

565

566

# Use readers

567

data = limited.read(10)

568

print(f"Successfully read {len(data)} bytes")

569

570

except Exception as e:

571

print(f"Error during reading: {e}")

572

573

finally:

574

# Always close readers to free resources

575

if 'limited' in locals():

576

limited.close()

577

if 'buffered' in locals():

578

buffered.close()

579

```