0
# Request Management
1
2
Advanced request lifecycle management with support for static lists, dynamic queues, and tandem operations. Request management components provide flexible ways to feed requests to crawlers and manage request processing workflows.
3
4
## Capabilities
5
6
### Request Loader
7
8
Abstract base class for loading and providing requests to crawlers.
9
10
```python { .api }
11
class RequestLoader:
12
async def get_total_count(self) -> int:
13
"""
14
Get total number of requests.
15
16
Returns:
17
Total request count
18
"""
19
20
async def is_finished(self) -> bool:
21
"""
22
Check if all requests have been processed.
23
24
Returns:
25
True if no more requests available
26
"""
27
28
async def is_empty(self) -> bool:
29
"""
30
Check if loader has no pending requests.
31
32
Returns:
33
True if no pending requests
34
"""
35
36
async def load_request(self) -> Request | None:
37
"""
38
Load next request for processing.
39
40
Returns:
41
Request object or None if no more requests
42
"""
43
44
async def mark_request_handled(self, request: Request) -> None:
45
"""Mark request as successfully processed."""
46
47
async def reclaim_request(self, request: Request) -> None:
48
"""Return request to loader for retry."""
49
```
50
51
### Request List
52
53
Static list of requests that can be processed in order with built-in state persistence.
54
55
```python { .api }
56
class RequestList(RequestLoader):
57
def __init__(
58
self,
59
requests: list[str | Request],
60
*,
61
persist_state_key: str | None = None,
62
persist_state_key_value_store_id: str | None = None
63
): ...
64
65
@classmethod
66
async def open(
67
cls,
68
name: str | None = None,
69
*,
70
requests: list[str | Request],
71
persist_state_key: str | None = None,
72
**kwargs
73
) -> RequestList:
74
"""
75
Open or create request list with persistence.
76
77
Args:
78
name: List identifier for persistence
79
requests: List of requests to process
80
persist_state_key: Key for state persistence
81
82
Returns:
83
RequestList instance
84
"""
85
86
async def add_request(self, request: str | Request) -> None:
87
"""Add request to the list."""
88
89
async def add_requests(self, requests: list[str | Request]) -> None:
90
"""Add multiple requests to the list."""
91
92
async def get_state(self) -> RequestListState:
93
"""Get current state for persistence."""
94
95
async def persist_state(self) -> None:
96
"""Save state to persistent storage."""
97
98
async def initialize(self) -> None:
99
"""Initialize list and restore state if configured."""
100
101
@property
102
def length(self) -> int:
103
"""Total number of requests in list."""
104
105
@property
106
def processed_count(self) -> int:
107
"""Number of processed requests."""
108
109
@property
110
def pending_count(self) -> int:
111
"""Number of pending requests."""
112
```
113
114
### Request Manager
115
116
Dynamic request management with automatic queue and list coordination.
117
118
```python { .api }
119
class RequestManager(RequestLoader):
120
def __init__(
121
self,
122
*,
123
request_queue: RequestQueue | None = None,
124
request_list: RequestList | None = None,
125
max_cached_requests: int = 1000
126
): ...
127
128
@classmethod
129
async def open(
130
cls,
131
name: str | None = None,
132
*,
133
request_queue: RequestQueue | None = None,
134
request_list: RequestList | None = None,
135
**kwargs
136
) -> RequestManager:
137
"""
138
Open or create request manager.
139
140
Args:
141
name: Manager identifier
142
request_queue: Queue for dynamic requests
143
request_list: List of static requests
144
145
Returns:
146
RequestManager instance
147
"""
148
149
async def add_request(
150
self,
151
request: str | Request,
152
*,
153
forefront: bool = False
154
) -> None:
155
"""
156
Add request to manager.
157
158
Args:
159
request: Request to add
160
forefront: Add to front for priority processing
161
"""
162
163
async def add_requests(
164
self,
165
requests: list[str | Request],
166
*,
167
forefront: bool = False
168
) -> None:
169
"""Add multiple requests to manager."""
170
171
async def get_handled_count(self) -> int:
172
"""Get number of handled requests."""
173
174
async def initialize(self) -> None:
175
"""Initialize manager components."""
176
177
async def teardown(self) -> None:
178
"""Clean up manager resources."""
179
```
180
181
### Request Manager Tandem
182
183
Tandem system coordinating between request list and queue for hybrid request processing.
184
185
```python { .api }
186
class RequestManagerTandem(RequestLoader):
187
def __init__(
188
self,
189
*,
190
request_list: RequestList,
191
request_queue: RequestQueue
192
): ...
193
194
@classmethod
195
async def open(
196
cls,
197
*,
198
request_list: RequestList,
199
request_queue: RequestQueue
200
) -> RequestManagerTandem:
201
"""
202
Create tandem manager with list and queue.
203
204
Args:
205
request_list: Static request list
206
request_queue: Dynamic request queue
207
208
Returns:
209
RequestManagerTandem instance
210
"""
211
212
async def add_request(
213
self,
214
request: str | Request,
215
*,
216
forefront: bool = False
217
) -> None:
218
"""Add request to queue (dynamic requests)."""
219
220
async def add_requests_batched(
221
self,
222
requests: list[str | Request],
223
*,
224
forefront: bool = False
225
) -> None:
226
"""Add multiple requests to queue in batch."""
227
228
@property
229
def request_list(self) -> RequestList:
230
"""Access to request list component."""
231
232
@property
233
def request_queue(self) -> RequestQueue:
234
"""Access to request queue component."""
235
```
236
237
## State Management
238
239
### Request List State
240
241
State information for request list persistence and recovery.
242
243
```python { .api }
244
class RequestListState:
245
def __init__(
246
self,
247
*,
248
next_index: int = 0,
249
next_unique_key: str | None = None,
250
in_progress: dict[str, Request] | None = None
251
): ...
252
253
@property
254
def next_index(self) -> int:
255
"""Index of next request to process."""
256
257
@property
258
def next_unique_key(self) -> str | None:
259
"""Unique key of next request."""
260
261
@property
262
def in_progress(self) -> dict[str, Request]:
263
"""Requests currently being processed."""
264
265
def to_dict(self) -> dict[str, any]:
266
"""Serialize state to dictionary."""
267
268
@classmethod
269
def from_dict(cls, data: dict[str, any]) -> RequestListState:
270
"""Restore state from dictionary."""
271
```
272
273
## Usage Examples
274
275
### Static Request List
276
277
```python
278
import asyncio
279
from crawlee.request_loaders import RequestList
280
from crawlee.crawlers import HttpCrawler, HttpCrawlingContext
281
282
async def main():
283
# Create list of requests to process
284
requests = [
285
'https://example.com/page1',
286
'https://example.com/page2',
287
'https://example.com/page3',
288
{'url': 'https://example.com/api', 'userData': {'type': 'api'}},
289
]
290
291
# Create request list with persistence
292
request_list = await RequestList.open(
293
name='my-crawl',
294
requests=requests,
295
persist_state_key='crawl-state'
296
)
297
298
crawler = HttpCrawler()
299
300
@crawler.router.default_handler
301
async def handler(context: HttpCrawlingContext):
302
print(f"Processing: {context.request.url}")
303
304
data = {
305
'url': context.request.url,
306
'status': context.response.status_code,
307
'user_data': context.request.user_data
308
}
309
310
await context.push_data(data)
311
312
# Process requests from list
313
while not await request_list.is_empty():
314
request = await request_list.load_request()
315
if request:
316
try:
317
# Process request with crawler
318
await crawler._handle_request(request)
319
await request_list.mark_request_handled(request)
320
321
except Exception as e:
322
print(f"Request failed: {e}")
323
await request_list.reclaim_request(request)
324
325
print(f"Processed {request_list.processed_count} requests")
326
327
asyncio.run(main())
328
```
329
330
### Dynamic Request Manager
331
332
```python
333
import asyncio
334
from crawlee.request_loaders import RequestManager
335
from crawlee.storages import RequestQueue
336
from crawlee.crawlers import HttpCrawler, HttpCrawlingContext
337
338
async def main():
339
# Create request manager with queue
340
queue = await RequestQueue.open('crawl-queue')
341
342
manager = await RequestManager.open(
343
name='dynamic-crawl',
344
request_queue=queue
345
)
346
347
crawler = HttpCrawler()
348
349
@crawler.router.default_handler
350
async def handler(context: HttpCrawlingContext):
351
print(f"Processing: {context.request.url}")
352
353
# Extract links and add them dynamically
354
# This is simplified - normally you'd parse HTML
355
if 'page1' in context.request.url:
356
await manager.add_requests([
357
'https://example.com/page2',
358
'https://example.com/page3'
359
])
360
361
data = {
362
'url': context.request.url,
363
'status': context.response.status_code
364
}
365
366
await context.push_data(data)
367
368
# Start with seed requests
369
await manager.add_requests([
370
'https://example.com/page1',
371
'https://example.com/start'
372
])
373
374
# Process requests dynamically
375
while not await manager.is_finished():
376
request = await manager.load_request()
377
if request:
378
try:
379
await crawler._handle_request(request)
380
await manager.mark_request_handled(request)
381
382
except Exception as e:
383
print(f"Request failed: {e}")
384
await manager.reclaim_request(request)
385
386
await asyncio.sleep(0.1) # Prevent tight loop
387
388
handled_count = await manager.get_handled_count()
389
print(f"Processed {handled_count} requests")
390
391
await manager.teardown()
392
393
asyncio.run(main())
394
```
395
396
### Tandem Request Processing
397
398
```python
399
import asyncio
400
from crawlee.request_loaders import RequestList, RequestManagerTandem
401
from crawlee.storages import RequestQueue
402
from crawlee.crawlers import HttpCrawler, HttpCrawlingContext
403
404
async def main():
405
# Static list of seed requests
406
seed_requests = [
407
'https://example.com/',
408
'https://example.com/products',
409
'https://example.com/about'
410
]
411
412
request_list = await RequestList.open(
413
name='seed-requests',
414
requests=seed_requests
415
)
416
417
# Dynamic queue for discovered requests
418
request_queue = await RequestQueue.open('discovered-requests')
419
420
# Create tandem manager
421
tandem = await RequestManagerTandem.open(
422
request_list=request_list,
423
request_queue=request_queue
424
)
425
426
crawler = HttpCrawler()
427
428
@crawler.router.default_handler
429
async def handler(context: HttpCrawlingContext):
430
url = context.request.url
431
print(f"Processing: {url}")
432
433
# Simulate link discovery and addition to queue
434
if '/products' in url:
435
# Add product pages to dynamic queue
436
await tandem.add_requests_batched([
437
f'https://example.com/product/{i}' for i in range(1, 6)
438
])
439
440
elif '/product/' in url:
441
# Add related products
442
product_id = url.split('/')[-1]
443
await tandem.add_request(f'https://example.com/product/{product_id}/reviews')
444
445
data = {
446
'url': url,
447
'source': 'seed' if url in seed_requests else 'discovered',
448
'status': context.response.status_code
449
}
450
451
await context.push_data(data)
452
453
# Process both static and dynamic requests
454
processed = 0
455
while not await tandem.is_finished():
456
request = await tandem.load_request()
457
if request:
458
try:
459
await crawler._handle_request(request)
460
await tandem.mark_request_handled(request)
461
processed += 1
462
463
except Exception as e:
464
print(f"Request failed: {e}")
465
await tandem.reclaim_request(request)
466
467
if processed % 10 == 0:
468
list_pending = tandem.request_list.pending_count
469
queue_info = await tandem.request_queue.get_info()
470
print(f"Processed: {processed}, List pending: {list_pending}, Queue pending: {queue_info.pending_request_count}")
471
472
print(f"Total processed: {processed}")
473
474
asyncio.run(main())
475
```
476
477
### Request List with State Persistence
478
479
```python
480
import asyncio
481
from crawlee.request_loaders import RequestList
482
from crawlee.storages import KeyValueStore
483
484
async def simulate_crawl_interruption():
485
"""Simulate a crawl that gets interrupted and resumed."""
486
487
print("=== Starting initial crawl ===")
488
489
# Create request list with persistence
490
requests = [f'https://example.com/page{i}' for i in range(1, 21)]
491
492
request_list = await RequestList.open(
493
name='persistent-crawl',
494
requests=requests,
495
persist_state_key='crawl-progress'
496
)
497
498
# Process some requests, then simulate interruption
499
processed = 0
500
target_before_interruption = 10
501
502
while not await request_list.is_empty() and processed < target_before_interruption:
503
request = await request_list.load_request()
504
if request:
505
print(f"Processing: {request.url}")
506
await asyncio.sleep(0.1) # Simulate work
507
508
# Mark as handled
509
await request_list.mark_request_handled(request)
510
processed += 1
511
512
print(f"Processed {processed} requests before interruption")
513
print(f"Remaining: {request_list.pending_count}")
514
515
# Persist state before "crash"
516
await request_list.persist_state()
517
print("State persisted")
518
519
print("\n=== Simulating application restart ===")
520
521
# Create new request list instance (simulating restart)
522
new_request_list = await RequestList.open(
523
name='persistent-crawl',
524
requests=requests, # Same requests as before
525
persist_state_key='crawl-progress' # Same persistence key
526
)
527
528
print(f"After restart - Processed: {new_request_list.processed_count}")
529
print(f"After restart - Pending: {new_request_list.pending_count}")
530
531
# Continue processing from where we left off
532
while not await new_request_list.is_empty():
533
request = await new_request_list.load_request()
534
if request:
535
print(f"Resuming: {request.url}")
536
await asyncio.sleep(0.1)
537
538
await new_request_list.mark_request_handled(request)
539
processed += 1
540
541
print(f"Total processed: {processed}")
542
print("Crawl completed successfully after restart")
543
544
asyncio.run(simulate_crawl_interruption())
545
```
546
547
### Custom Request Loader
548
549
```python
550
import asyncio
551
from typing import Iterator
552
from crawlee.request_loaders import RequestLoader
553
from crawlee import Request
554
555
class PriorityRequestLoader(RequestLoader):
556
"""Custom request loader with priority queuing."""
557
558
def __init__(self, requests: list[tuple[int, str | Request]]):
559
"""
560
Initialize with priority-request tuples.
561
562
Args:
563
requests: List of (priority, request) tuples (lower number = higher priority)
564
"""
565
import heapq
566
567
self.heap = []
568
self.handled = set()
569
self.in_progress = {}
570
self.total_count = len(requests)
571
572
# Build priority heap
573
for priority, request in requests:
574
if isinstance(request, str):
575
request = Request(request)
576
577
heapq.heappush(self.heap, (priority, request.unique_key, request))
578
579
async def get_total_count(self) -> int:
580
return self.total_count
581
582
async def is_finished(self) -> bool:
583
return len(self.handled) == self.total_count
584
585
async def is_empty(self) -> bool:
586
return not self.heap and not self.in_progress
587
588
async def load_request(self) -> Request | None:
589
if not self.heap:
590
return None
591
592
import heapq
593
priority, unique_key, request = heapq.heappop(self.heap)
594
595
# Track as in progress
596
self.in_progress[unique_key] = request
597
598
return request
599
600
async def mark_request_handled(self, request: Request) -> None:
601
unique_key = request.unique_key
602
603
if unique_key in self.in_progress:
604
del self.in_progress[unique_key]
605
self.handled.add(unique_key)
606
607
async def reclaim_request(self, request: Request) -> None:
608
unique_key = request.unique_key
609
610
if unique_key in self.in_progress:
611
# Return to heap with same priority (simplified - could implement retry logic)
612
import heapq
613
heapq.heappush(self.heap, (0, unique_key, request)) # High priority for retry
614
del self.in_progress[unique_key]
615
616
async def main():
617
# Create requests with priorities (lower number = higher priority)
618
priority_requests = [
619
(1, 'https://example.com/important'), # High priority
620
(5, 'https://example.com/page1'), # Low priority
621
(1, 'https://example.com/urgent'), # High priority
622
(3, 'https://example.com/page2'), # Medium priority
623
(5, 'https://example.com/page3'), # Low priority
624
]
625
626
loader = PriorityRequestLoader(priority_requests)
627
628
print("Processing requests by priority:")
629
while not await loader.is_finished():
630
request = await loader.load_request()
631
if request:
632
print(f"Processing: {request.url}")
633
634
# Simulate processing
635
await asyncio.sleep(0.1)
636
637
# Occasionally fail to test retry
638
if 'page2' in request.url:
639
print(f" Failed: {request.url}")
640
await loader.reclaim_request(request)
641
else:
642
print(f" Completed: {request.url}")
643
await loader.mark_request_handled(request)
644
645
print("All requests processed")
646
647
asyncio.run(main())
648
```