A comprehensive web scraping and browser automation library for Python with human-like behavior and bot protection bypass
Advanced request lifecycle management with support for static lists, dynamic queues, and tandem operations. Request management components provide flexible ways to feed requests to crawlers and manage request processing workflows.
Abstract base class for loading and providing requests to crawlers.
class RequestLoader:
async def get_total_count(self) -> int:
"""
Get total number of requests.
Returns:
Total request count
"""
async def is_finished(self) -> bool:
"""
Check if all requests have been processed.
Returns:
True if no more requests available
"""
async def is_empty(self) -> bool:
"""
Check if loader has no pending requests.
Returns:
True if no pending requests
"""
async def load_request(self) -> Request | None:
"""
Load next request for processing.
Returns:
Request object or None if no more requests
"""
async def mark_request_handled(self, request: Request) -> None:
"""Mark request as successfully processed."""
async def reclaim_request(self, request: Request) -> None:
"""Return request to loader for retry."""Static list of requests that can be processed in order with built-in state persistence.
class RequestList(RequestLoader):
def __init__(
self,
requests: list[str | Request],
*,
persist_state_key: str | None = None,
persist_state_key_value_store_id: str | None = None
): ...
@classmethod
async def open(
cls,
name: str | None = None,
*,
requests: list[str | Request],
persist_state_key: str | None = None,
**kwargs
) -> RequestList:
"""
Open or create request list with persistence.
Args:
name: List identifier for persistence
requests: List of requests to process
persist_state_key: Key for state persistence
Returns:
RequestList instance
"""
async def add_request(self, request: str | Request) -> None:
"""Add request to the list."""
async def add_requests(self, requests: list[str | Request]) -> None:
"""Add multiple requests to the list."""
async def get_state(self) -> RequestListState:
"""Get current state for persistence."""
async def persist_state(self) -> None:
"""Save state to persistent storage."""
async def initialize(self) -> None:
"""Initialize list and restore state if configured."""
@property
def length(self) -> int:
"""Total number of requests in list."""
@property
def processed_count(self) -> int:
"""Number of processed requests."""
@property
def pending_count(self) -> int:
"""Number of pending requests."""Dynamic request management with automatic queue and list coordination.
class RequestManager(RequestLoader):
def __init__(
self,
*,
request_queue: RequestQueue | None = None,
request_list: RequestList | None = None,
max_cached_requests: int = 1000
): ...
@classmethod
async def open(
cls,
name: str | None = None,
*,
request_queue: RequestQueue | None = None,
request_list: RequestList | None = None,
**kwargs
) -> RequestManager:
"""
Open or create request manager.
Args:
name: Manager identifier
request_queue: Queue for dynamic requests
request_list: List of static requests
Returns:
RequestManager instance
"""
async def add_request(
self,
request: str | Request,
*,
forefront: bool = False
) -> None:
"""
Add request to manager.
Args:
request: Request to add
forefront: Add to front for priority processing
"""
async def add_requests(
self,
requests: list[str | Request],
*,
forefront: bool = False
) -> None:
"""Add multiple requests to manager."""
async def get_handled_count(self) -> int:
"""Get number of handled requests."""
async def initialize(self) -> None:
"""Initialize manager components."""
async def teardown(self) -> None:
"""Clean up manager resources."""Tandem system coordinating between request list and queue for hybrid request processing.
class RequestManagerTandem(RequestLoader):
def __init__(
self,
*,
request_list: RequestList,
request_queue: RequestQueue
): ...
@classmethod
async def open(
cls,
*,
request_list: RequestList,
request_queue: RequestQueue
) -> RequestManagerTandem:
"""
Create tandem manager with list and queue.
Args:
request_list: Static request list
request_queue: Dynamic request queue
Returns:
RequestManagerTandem instance
"""
async def add_request(
self,
request: str | Request,
*,
forefront: bool = False
) -> None:
"""Add request to queue (dynamic requests)."""
async def add_requests_batched(
self,
requests: list[str | Request],
*,
forefront: bool = False
) -> None:
"""Add multiple requests to queue in batch."""
@property
def request_list(self) -> RequestList:
"""Access to request list component."""
@property
def request_queue(self) -> RequestQueue:
"""Access to request queue component."""State information for request list persistence and recovery.
class RequestListState:
def __init__(
self,
*,
next_index: int = 0,
next_unique_key: str | None = None,
in_progress: dict[str, Request] | None = None
): ...
@property
def next_index(self) -> int:
"""Index of next request to process."""
@property
def next_unique_key(self) -> str | None:
"""Unique key of next request."""
@property
def in_progress(self) -> dict[str, Request]:
"""Requests currently being processed."""
def to_dict(self) -> dict[str, any]:
"""Serialize state to dictionary."""
@classmethod
def from_dict(cls, data: dict[str, any]) -> RequestListState:
"""Restore state from dictionary."""import asyncio
from crawlee.request_loaders import RequestList
from crawlee.crawlers import HttpCrawler, HttpCrawlingContext
async def main():
# Create list of requests to process
requests = [
'https://example.com/page1',
'https://example.com/page2',
'https://example.com/page3',
{'url': 'https://example.com/api', 'userData': {'type': 'api'}},
]
# Create request list with persistence
request_list = await RequestList.open(
name='my-crawl',
requests=requests,
persist_state_key='crawl-state'
)
crawler = HttpCrawler()
@crawler.router.default_handler
async def handler(context: HttpCrawlingContext):
print(f"Processing: {context.request.url}")
data = {
'url': context.request.url,
'status': context.response.status_code,
'user_data': context.request.user_data
}
await context.push_data(data)
# Process requests from list
while not await request_list.is_empty():
request = await request_list.load_request()
if request:
try:
# Process request with crawler
await crawler._handle_request(request)
await request_list.mark_request_handled(request)
except Exception as e:
print(f"Request failed: {e}")
await request_list.reclaim_request(request)
print(f"Processed {request_list.processed_count} requests")
asyncio.run(main())import asyncio
from crawlee.request_loaders import RequestManager
from crawlee.storages import RequestQueue
from crawlee.crawlers import HttpCrawler, HttpCrawlingContext
async def main():
# Create request manager with queue
queue = await RequestQueue.open('crawl-queue')
manager = await RequestManager.open(
name='dynamic-crawl',
request_queue=queue
)
crawler = HttpCrawler()
@crawler.router.default_handler
async def handler(context: HttpCrawlingContext):
print(f"Processing: {context.request.url}")
# Extract links and add them dynamically
# This is simplified - normally you'd parse HTML
if 'page1' in context.request.url:
await manager.add_requests([
'https://example.com/page2',
'https://example.com/page3'
])
data = {
'url': context.request.url,
'status': context.response.status_code
}
await context.push_data(data)
# Start with seed requests
await manager.add_requests([
'https://example.com/page1',
'https://example.com/start'
])
# Process requests dynamically
while not await manager.is_finished():
request = await manager.load_request()
if request:
try:
await crawler._handle_request(request)
await manager.mark_request_handled(request)
except Exception as e:
print(f"Request failed: {e}")
await manager.reclaim_request(request)
await asyncio.sleep(0.1) # Prevent tight loop
handled_count = await manager.get_handled_count()
print(f"Processed {handled_count} requests")
await manager.teardown()
asyncio.run(main())import asyncio
from crawlee.request_loaders import RequestList, RequestManagerTandem
from crawlee.storages import RequestQueue
from crawlee.crawlers import HttpCrawler, HttpCrawlingContext
async def main():
# Static list of seed requests
seed_requests = [
'https://example.com/',
'https://example.com/products',
'https://example.com/about'
]
request_list = await RequestList.open(
name='seed-requests',
requests=seed_requests
)
# Dynamic queue for discovered requests
request_queue = await RequestQueue.open('discovered-requests')
# Create tandem manager
tandem = await RequestManagerTandem.open(
request_list=request_list,
request_queue=request_queue
)
crawler = HttpCrawler()
@crawler.router.default_handler
async def handler(context: HttpCrawlingContext):
url = context.request.url
print(f"Processing: {url}")
# Simulate link discovery and addition to queue
if '/products' in url:
# Add product pages to dynamic queue
await tandem.add_requests_batched([
f'https://example.com/product/{i}' for i in range(1, 6)
])
elif '/product/' in url:
# Add related products
product_id = url.split('/')[-1]
await tandem.add_request(f'https://example.com/product/{product_id}/reviews')
data = {
'url': url,
'source': 'seed' if url in seed_requests else 'discovered',
'status': context.response.status_code
}
await context.push_data(data)
# Process both static and dynamic requests
processed = 0
while not await tandem.is_finished():
request = await tandem.load_request()
if request:
try:
await crawler._handle_request(request)
await tandem.mark_request_handled(request)
processed += 1
except Exception as e:
print(f"Request failed: {e}")
await tandem.reclaim_request(request)
if processed % 10 == 0:
list_pending = tandem.request_list.pending_count
queue_info = await tandem.request_queue.get_info()
print(f"Processed: {processed}, List pending: {list_pending}, Queue pending: {queue_info.pending_request_count}")
print(f"Total processed: {processed}")
asyncio.run(main())import asyncio
from crawlee.request_loaders import RequestList
from crawlee.storages import KeyValueStore
async def simulate_crawl_interruption():
"""Simulate a crawl that gets interrupted and resumed."""
print("=== Starting initial crawl ===")
# Create request list with persistence
requests = [f'https://example.com/page{i}' for i in range(1, 21)]
request_list = await RequestList.open(
name='persistent-crawl',
requests=requests,
persist_state_key='crawl-progress'
)
# Process some requests, then simulate interruption
processed = 0
target_before_interruption = 10
while not await request_list.is_empty() and processed < target_before_interruption:
request = await request_list.load_request()
if request:
print(f"Processing: {request.url}")
await asyncio.sleep(0.1) # Simulate work
# Mark as handled
await request_list.mark_request_handled(request)
processed += 1
print(f"Processed {processed} requests before interruption")
print(f"Remaining: {request_list.pending_count}")
# Persist state before "crash"
await request_list.persist_state()
print("State persisted")
print("\n=== Simulating application restart ===")
# Create new request list instance (simulating restart)
new_request_list = await RequestList.open(
name='persistent-crawl',
requests=requests, # Same requests as before
persist_state_key='crawl-progress' # Same persistence key
)
print(f"After restart - Processed: {new_request_list.processed_count}")
print(f"After restart - Pending: {new_request_list.pending_count}")
# Continue processing from where we left off
while not await new_request_list.is_empty():
request = await new_request_list.load_request()
if request:
print(f"Resuming: {request.url}")
await asyncio.sleep(0.1)
await new_request_list.mark_request_handled(request)
processed += 1
print(f"Total processed: {processed}")
print("Crawl completed successfully after restart")
asyncio.run(simulate_crawl_interruption())import asyncio
from typing import Iterator
from crawlee.request_loaders import RequestLoader
from crawlee import Request
class PriorityRequestLoader(RequestLoader):
"""Custom request loader with priority queuing."""
def __init__(self, requests: list[tuple[int, str | Request]]):
"""
Initialize with priority-request tuples.
Args:
requests: List of (priority, request) tuples (lower number = higher priority)
"""
import heapq
self.heap = []
self.handled = set()
self.in_progress = {}
self.total_count = len(requests)
# Build priority heap
for priority, request in requests:
if isinstance(request, str):
request = Request(request)
heapq.heappush(self.heap, (priority, request.unique_key, request))
async def get_total_count(self) -> int:
return self.total_count
async def is_finished(self) -> bool:
return len(self.handled) == self.total_count
async def is_empty(self) -> bool:
return not self.heap and not self.in_progress
async def load_request(self) -> Request | None:
if not self.heap:
return None
import heapq
priority, unique_key, request = heapq.heappop(self.heap)
# Track as in progress
self.in_progress[unique_key] = request
return request
async def mark_request_handled(self, request: Request) -> None:
unique_key = request.unique_key
if unique_key in self.in_progress:
del self.in_progress[unique_key]
self.handled.add(unique_key)
async def reclaim_request(self, request: Request) -> None:
unique_key = request.unique_key
if unique_key in self.in_progress:
# Return to heap with same priority (simplified - could implement retry logic)
import heapq
heapq.heappush(self.heap, (0, unique_key, request)) # High priority for retry
del self.in_progress[unique_key]
async def main():
# Create requests with priorities (lower number = higher priority)
priority_requests = [
(1, 'https://example.com/important'), # High priority
(5, 'https://example.com/page1'), # Low priority
(1, 'https://example.com/urgent'), # High priority
(3, 'https://example.com/page2'), # Medium priority
(5, 'https://example.com/page3'), # Low priority
]
loader = PriorityRequestLoader(priority_requests)
print("Processing requests by priority:")
while not await loader.is_finished():
request = await loader.load_request()
if request:
print(f"Processing: {request.url}")
# Simulate processing
await asyncio.sleep(0.1)
# Occasionally fail to test retry
if 'page2' in request.url:
print(f" Failed: {request.url}")
await loader.reclaim_request(request)
else:
print(f" Completed: {request.url}")
await loader.mark_request_handled(request)
print("All requests processed")
asyncio.run(main())Install with Tessl CLI
npx tessl i tessl/pypi-crawlee