or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

browser-automation.mdcli-tools.mdconfiguration.mdcore-types.mdcrawlers.mderror-handling.mdevents.mdfingerprinting.mdhttp-clients.mdindex.mdrequest-management.mdsessions.mdstatistics.mdstorage.md

request-management.mddocs/

0

# Request Management

1

2

Advanced request lifecycle management with support for static lists, dynamic queues, and tandem operations. Request management components provide flexible ways to feed requests to crawlers and manage request processing workflows.

3

4

## Capabilities

5

6

### Request Loader

7

8

Abstract base class for loading and providing requests to crawlers.

9

10

```python { .api }

11

class RequestLoader:

12

async def get_total_count(self) -> int:

13

"""

14

Get total number of requests.

15

16

Returns:

17

Total request count

18

"""

19

20

async def is_finished(self) -> bool:

21

"""

22

Check if all requests have been processed.

23

24

Returns:

25

True if no more requests available

26

"""

27

28

async def is_empty(self) -> bool:

29

"""

30

Check if loader has no pending requests.

31

32

Returns:

33

True if no pending requests

34

"""

35

36

async def load_request(self) -> Request | None:

37

"""

38

Load next request for processing.

39

40

Returns:

41

Request object or None if no more requests

42

"""

43

44

async def mark_request_handled(self, request: Request) -> None:

45

"""Mark request as successfully processed."""

46

47

async def reclaim_request(self, request: Request) -> None:

48

"""Return request to loader for retry."""

49

```

50

51

### Request List

52

53

Static list of requests that can be processed in order with built-in state persistence.

54

55

```python { .api }

56

class RequestList(RequestLoader):

57

def __init__(

58

self,

59

requests: list[str | Request],

60

*,

61

persist_state_key: str | None = None,

62

persist_state_key_value_store_id: str | None = None

63

): ...

64

65

@classmethod

66

async def open(

67

cls,

68

name: str | None = None,

69

*,

70

requests: list[str | Request],

71

persist_state_key: str | None = None,

72

**kwargs

73

) -> RequestList:

74

"""

75

Open or create request list with persistence.

76

77

Args:

78

name: List identifier for persistence

79

requests: List of requests to process

80

persist_state_key: Key for state persistence

81

82

Returns:

83

RequestList instance

84

"""

85

86

async def add_request(self, request: str | Request) -> None:

87

"""Add request to the list."""

88

89

async def add_requests(self, requests: list[str | Request]) -> None:

90

"""Add multiple requests to the list."""

91

92

async def get_state(self) -> RequestListState:

93

"""Get current state for persistence."""

94

95

async def persist_state(self) -> None:

96

"""Save state to persistent storage."""

97

98

async def initialize(self) -> None:

99

"""Initialize list and restore state if configured."""

100

101

@property

102

def length(self) -> int:

103

"""Total number of requests in list."""

104

105

@property

106

def processed_count(self) -> int:

107

"""Number of processed requests."""

108

109

@property

110

def pending_count(self) -> int:

111

"""Number of pending requests."""

112

```

113

114

### Request Manager

115

116

Dynamic request management with automatic queue and list coordination.

117

118

```python { .api }

119

class RequestManager(RequestLoader):

120

def __init__(

121

self,

122

*,

123

request_queue: RequestQueue | None = None,

124

request_list: RequestList | None = None,

125

max_cached_requests: int = 1000

126

): ...

127

128

@classmethod

129

async def open(

130

cls,

131

name: str | None = None,

132

*,

133

request_queue: RequestQueue | None = None,

134

request_list: RequestList | None = None,

135

**kwargs

136

) -> RequestManager:

137

"""

138

Open or create request manager.

139

140

Args:

141

name: Manager identifier

142

request_queue: Queue for dynamic requests

143

request_list: List of static requests

144

145

Returns:

146

RequestManager instance

147

"""

148

149

async def add_request(

150

self,

151

request: str | Request,

152

*,

153

forefront: bool = False

154

) -> None:

155

"""

156

Add request to manager.

157

158

Args:

159

request: Request to add

160

forefront: Add to front for priority processing

161

"""

162

163

async def add_requests(

164

self,

165

requests: list[str | Request],

166

*,

167

forefront: bool = False

168

) -> None:

169

"""Add multiple requests to manager."""

170

171

async def get_handled_count(self) -> int:

172

"""Get number of handled requests."""

173

174

async def initialize(self) -> None:

175

"""Initialize manager components."""

176

177

async def teardown(self) -> None:

178

"""Clean up manager resources."""

179

```

180

181

### Request Manager Tandem

182

183

Tandem system coordinating between request list and queue for hybrid request processing.

184

185

```python { .api }

186

class RequestManagerTandem(RequestLoader):

187

def __init__(

188

self,

189

*,

190

request_list: RequestList,

191

request_queue: RequestQueue

192

): ...

193

194

@classmethod

195

async def open(

196

cls,

197

*,

198

request_list: RequestList,

199

request_queue: RequestQueue

200

) -> RequestManagerTandem:

201

"""

202

Create tandem manager with list and queue.

203

204

Args:

205

request_list: Static request list

206

request_queue: Dynamic request queue

207

208

Returns:

209

RequestManagerTandem instance

210

"""

211

212

async def add_request(

213

self,

214

request: str | Request,

215

*,

216

forefront: bool = False

217

) -> None:

218

"""Add request to queue (dynamic requests)."""

219

220

async def add_requests_batched(

221

self,

222

requests: list[str | Request],

223

*,

224

forefront: bool = False

225

) -> None:

226

"""Add multiple requests to queue in batch."""

227

228

@property

229

def request_list(self) -> RequestList:

230

"""Access to request list component."""

231

232

@property

233

def request_queue(self) -> RequestQueue:

234

"""Access to request queue component."""

235

```

236

237

## State Management

238

239

### Request List State

240

241

State information for request list persistence and recovery.

242

243

```python { .api }

244

class RequestListState:

245

def __init__(

246

self,

247

*,

248

next_index: int = 0,

249

next_unique_key: str | None = None,

250

in_progress: dict[str, Request] | None = None

251

): ...

252

253

@property

254

def next_index(self) -> int:

255

"""Index of next request to process."""

256

257

@property

258

def next_unique_key(self) -> str | None:

259

"""Unique key of next request."""

260

261

@property

262

def in_progress(self) -> dict[str, Request]:

263

"""Requests currently being processed."""

264

265

def to_dict(self) -> dict[str, any]:

266

"""Serialize state to dictionary."""

267

268

@classmethod

269

def from_dict(cls, data: dict[str, any]) -> RequestListState:

270

"""Restore state from dictionary."""

271

```

272

273

## Usage Examples

274

275

### Static Request List

276

277

```python

278

import asyncio

279

from crawlee.request_loaders import RequestList

280

from crawlee.crawlers import HttpCrawler, HttpCrawlingContext

281

282

async def main():

283

# Create list of requests to process

284

requests = [

285

'https://example.com/page1',

286

'https://example.com/page2',

287

'https://example.com/page3',

288

{'url': 'https://example.com/api', 'userData': {'type': 'api'}},

289

]

290

291

# Create request list with persistence

292

request_list = await RequestList.open(

293

name='my-crawl',

294

requests=requests,

295

persist_state_key='crawl-state'

296

)

297

298

crawler = HttpCrawler()

299

300

@crawler.router.default_handler

301

async def handler(context: HttpCrawlingContext):

302

print(f"Processing: {context.request.url}")

303

304

data = {

305

'url': context.request.url,

306

'status': context.response.status_code,

307

'user_data': context.request.user_data

308

}

309

310

await context.push_data(data)

311

312

# Process requests from list

313

while not await request_list.is_empty():

314

request = await request_list.load_request()

315

if request:

316

try:

317

# Process request with crawler

318

await crawler._handle_request(request)

319

await request_list.mark_request_handled(request)

320

321

except Exception as e:

322

print(f"Request failed: {e}")

323

await request_list.reclaim_request(request)

324

325

print(f"Processed {request_list.processed_count} requests")

326

327

asyncio.run(main())

328

```

329

330

### Dynamic Request Manager

331

332

```python

333

import asyncio

334

from crawlee.request_loaders import RequestManager

335

from crawlee.storages import RequestQueue

336

from crawlee.crawlers import HttpCrawler, HttpCrawlingContext

337

338

async def main():

339

# Create request manager with queue

340

queue = await RequestQueue.open('crawl-queue')

341

342

manager = await RequestManager.open(

343

name='dynamic-crawl',

344

request_queue=queue

345

)

346

347

crawler = HttpCrawler()

348

349

@crawler.router.default_handler

350

async def handler(context: HttpCrawlingContext):

351

print(f"Processing: {context.request.url}")

352

353

# Extract links and add them dynamically

354

# This is simplified - normally you'd parse HTML

355

if 'page1' in context.request.url:

356

await manager.add_requests([

357

'https://example.com/page2',

358

'https://example.com/page3'

359

])

360

361

data = {

362

'url': context.request.url,

363

'status': context.response.status_code

364

}

365

366

await context.push_data(data)

367

368

# Start with seed requests

369

await manager.add_requests([

370

'https://example.com/page1',

371

'https://example.com/start'

372

])

373

374

# Process requests dynamically

375

while not await manager.is_finished():

376

request = await manager.load_request()

377

if request:

378

try:

379

await crawler._handle_request(request)

380

await manager.mark_request_handled(request)

381

382

except Exception as e:

383

print(f"Request failed: {e}")

384

await manager.reclaim_request(request)

385

386

await asyncio.sleep(0.1) # Prevent tight loop

387

388

handled_count = await manager.get_handled_count()

389

print(f"Processed {handled_count} requests")

390

391

await manager.teardown()

392

393

asyncio.run(main())

394

```

395

396

### Tandem Request Processing

397

398

```python

399

import asyncio

400

from crawlee.request_loaders import RequestList, RequestManagerTandem

401

from crawlee.storages import RequestQueue

402

from crawlee.crawlers import HttpCrawler, HttpCrawlingContext

403

404

async def main():

405

# Static list of seed requests

406

seed_requests = [

407

'https://example.com/',

408

'https://example.com/products',

409

'https://example.com/about'

410

]

411

412

request_list = await RequestList.open(

413

name='seed-requests',

414

requests=seed_requests

415

)

416

417

# Dynamic queue for discovered requests

418

request_queue = await RequestQueue.open('discovered-requests')

419

420

# Create tandem manager

421

tandem = await RequestManagerTandem.open(

422

request_list=request_list,

423

request_queue=request_queue

424

)

425

426

crawler = HttpCrawler()

427

428

@crawler.router.default_handler

429

async def handler(context: HttpCrawlingContext):

430

url = context.request.url

431

print(f"Processing: {url}")

432

433

# Simulate link discovery and addition to queue

434

if '/products' in url:

435

# Add product pages to dynamic queue

436

await tandem.add_requests_batched([

437

f'https://example.com/product/{i}' for i in range(1, 6)

438

])

439

440

elif '/product/' in url:

441

# Add related products

442

product_id = url.split('/')[-1]

443

await tandem.add_request(f'https://example.com/product/{product_id}/reviews')

444

445

data = {

446

'url': url,

447

'source': 'seed' if url in seed_requests else 'discovered',

448

'status': context.response.status_code

449

}

450

451

await context.push_data(data)

452

453

# Process both static and dynamic requests

454

processed = 0

455

while not await tandem.is_finished():

456

request = await tandem.load_request()

457

if request:

458

try:

459

await crawler._handle_request(request)

460

await tandem.mark_request_handled(request)

461

processed += 1

462

463

except Exception as e:

464

print(f"Request failed: {e}")

465

await tandem.reclaim_request(request)

466

467

if processed % 10 == 0:

468

list_pending = tandem.request_list.pending_count

469

queue_info = await tandem.request_queue.get_info()

470

print(f"Processed: {processed}, List pending: {list_pending}, Queue pending: {queue_info.pending_request_count}")

471

472

print(f"Total processed: {processed}")

473

474

asyncio.run(main())

475

```

476

477

### Request List with State Persistence

478

479

```python

480

import asyncio

481

from crawlee.request_loaders import RequestList

482

from crawlee.storages import KeyValueStore

483

484

async def simulate_crawl_interruption():

485

"""Simulate a crawl that gets interrupted and resumed."""

486

487

print("=== Starting initial crawl ===")

488

489

# Create request list with persistence

490

requests = [f'https://example.com/page{i}' for i in range(1, 21)]

491

492

request_list = await RequestList.open(

493

name='persistent-crawl',

494

requests=requests,

495

persist_state_key='crawl-progress'

496

)

497

498

# Process some requests, then simulate interruption

499

processed = 0

500

target_before_interruption = 10

501

502

while not await request_list.is_empty() and processed < target_before_interruption:

503

request = await request_list.load_request()

504

if request:

505

print(f"Processing: {request.url}")

506

await asyncio.sleep(0.1) # Simulate work

507

508

# Mark as handled

509

await request_list.mark_request_handled(request)

510

processed += 1

511

512

print(f"Processed {processed} requests before interruption")

513

print(f"Remaining: {request_list.pending_count}")

514

515

# Persist state before "crash"

516

await request_list.persist_state()

517

print("State persisted")

518

519

print("\n=== Simulating application restart ===")

520

521

# Create new request list instance (simulating restart)

522

new_request_list = await RequestList.open(

523

name='persistent-crawl',

524

requests=requests, # Same requests as before

525

persist_state_key='crawl-progress' # Same persistence key

526

)

527

528

print(f"After restart - Processed: {new_request_list.processed_count}")

529

print(f"After restart - Pending: {new_request_list.pending_count}")

530

531

# Continue processing from where we left off

532

while not await new_request_list.is_empty():

533

request = await new_request_list.load_request()

534

if request:

535

print(f"Resuming: {request.url}")

536

await asyncio.sleep(0.1)

537

538

await new_request_list.mark_request_handled(request)

539

processed += 1

540

541

print(f"Total processed: {processed}")

542

print("Crawl completed successfully after restart")

543

544

asyncio.run(simulate_crawl_interruption())

545

```

546

547

### Custom Request Loader

548

549

```python

550

import asyncio

551

from typing import Iterator

552

from crawlee.request_loaders import RequestLoader

553

from crawlee import Request

554

555

class PriorityRequestLoader(RequestLoader):

556

"""Custom request loader with priority queuing."""

557

558

def __init__(self, requests: list[tuple[int, str | Request]]):

559

"""

560

Initialize with priority-request tuples.

561

562

Args:

563

requests: List of (priority, request) tuples (lower number = higher priority)

564

"""

565

import heapq

566

567

self.heap = []

568

self.handled = set()

569

self.in_progress = {}

570

self.total_count = len(requests)

571

572

# Build priority heap

573

for priority, request in requests:

574

if isinstance(request, str):

575

request = Request(request)

576

577

heapq.heappush(self.heap, (priority, request.unique_key, request))

578

579

async def get_total_count(self) -> int:

580

return self.total_count

581

582

async def is_finished(self) -> bool:

583

return len(self.handled) == self.total_count

584

585

async def is_empty(self) -> bool:

586

return not self.heap and not self.in_progress

587

588

async def load_request(self) -> Request | None:

589

if not self.heap:

590

return None

591

592

import heapq

593

priority, unique_key, request = heapq.heappop(self.heap)

594

595

# Track as in progress

596

self.in_progress[unique_key] = request

597

598

return request

599

600

async def mark_request_handled(self, request: Request) -> None:

601

unique_key = request.unique_key

602

603

if unique_key in self.in_progress:

604

del self.in_progress[unique_key]

605

self.handled.add(unique_key)

606

607

async def reclaim_request(self, request: Request) -> None:

608

unique_key = request.unique_key

609

610

if unique_key in self.in_progress:

611

# Return to heap with same priority (simplified - could implement retry logic)

612

import heapq

613

heapq.heappush(self.heap, (0, unique_key, request)) # High priority for retry

614

del self.in_progress[unique_key]

615

616

async def main():

617

# Create requests with priorities (lower number = higher priority)

618

priority_requests = [

619

(1, 'https://example.com/important'), # High priority

620

(5, 'https://example.com/page1'), # Low priority

621

(1, 'https://example.com/urgent'), # High priority

622

(3, 'https://example.com/page2'), # Medium priority

623

(5, 'https://example.com/page3'), # Low priority

624

]

625

626

loader = PriorityRequestLoader(priority_requests)

627

628

print("Processing requests by priority:")

629

while not await loader.is_finished():

630

request = await loader.load_request()

631

if request:

632

print(f"Processing: {request.url}")

633

634

# Simulate processing

635

await asyncio.sleep(0.1)

636

637

# Occasionally fail to test retry

638

if 'page2' in request.url:

639

print(f" Failed: {request.url}")

640

await loader.reclaim_request(request)

641

else:

642

print(f" Completed: {request.url}")

643

await loader.mark_request_handled(request)

644

645

print("All requests processed")

646

647

asyncio.run(main())

648

```