CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-crawlee

A comprehensive web scraping and browser automation library for Python with human-like behavior and bot protection bypass

Overview
Eval results
Files

request-management.mddocs/

Request Management

Advanced request lifecycle management with support for static lists, dynamic queues, and tandem operations. Request management components provide flexible ways to feed requests to crawlers and manage request processing workflows.

Capabilities

Request Loader

Abstract base class for loading and providing requests to crawlers.

class RequestLoader:
    async def get_total_count(self) -> int:
        """
        Get total number of requests.

        Returns:
            Total request count
        """

    async def is_finished(self) -> bool:
        """
        Check if all requests have been processed.

        Returns:
            True if no more requests available
        """

    async def is_empty(self) -> bool:
        """
        Check if loader has no pending requests.

        Returns:
            True if no pending requests
        """

    async def load_request(self) -> Request | None:
        """
        Load next request for processing.

        Returns:
            Request object or None if no more requests
        """

    async def mark_request_handled(self, request: Request) -> None:
        """Mark request as successfully processed."""

    async def reclaim_request(self, request: Request) -> None:
        """Return request to loader for retry."""

Request List

Static list of requests that can be processed in order with built-in state persistence.

class RequestList(RequestLoader):
    def __init__(
        self,
        requests: list[str | Request],
        *,
        persist_state_key: str | None = None,
        persist_state_key_value_store_id: str | None = None
    ): ...

    @classmethod
    async def open(
        cls,
        name: str | None = None,
        *,
        requests: list[str | Request],
        persist_state_key: str | None = None,
        **kwargs
    ) -> RequestList:
        """
        Open or create request list with persistence.

        Args:
            name: List identifier for persistence
            requests: List of requests to process
            persist_state_key: Key for state persistence

        Returns:
            RequestList instance
        """

    async def add_request(self, request: str | Request) -> None:
        """Add request to the list."""

    async def add_requests(self, requests: list[str | Request]) -> None:
        """Add multiple requests to the list."""

    async def get_state(self) -> RequestListState:
        """Get current state for persistence."""

    async def persist_state(self) -> None:
        """Save state to persistent storage."""

    async def initialize(self) -> None:
        """Initialize list and restore state if configured."""

    @property
    def length(self) -> int:
        """Total number of requests in list."""

    @property
    def processed_count(self) -> int:
        """Number of processed requests."""

    @property
    def pending_count(self) -> int:
        """Number of pending requests."""

Request Manager

Dynamic request management with automatic queue and list coordination.

class RequestManager(RequestLoader):
    def __init__(
        self,
        *,
        request_queue: RequestQueue | None = None,
        request_list: RequestList | None = None,
        max_cached_requests: int = 1000
    ): ...

    @classmethod
    async def open(
        cls,
        name: str | None = None,
        *,
        request_queue: RequestQueue | None = None,
        request_list: RequestList | None = None,
        **kwargs
    ) -> RequestManager:
        """
        Open or create request manager.

        Args:
            name: Manager identifier
            request_queue: Queue for dynamic requests
            request_list: List of static requests

        Returns:
            RequestManager instance
        """

    async def add_request(
        self,
        request: str | Request,
        *,
        forefront: bool = False
    ) -> None:
        """
        Add request to manager.

        Args:
            request: Request to add
            forefront: Add to front for priority processing
        """

    async def add_requests(
        self,
        requests: list[str | Request],
        *,
        forefront: bool = False
    ) -> None:
        """Add multiple requests to manager."""

    async def get_handled_count(self) -> int:
        """Get number of handled requests."""

    async def initialize(self) -> None:
        """Initialize manager components."""

    async def teardown(self) -> None:
        """Clean up manager resources."""

Request Manager Tandem

Tandem system coordinating between request list and queue for hybrid request processing.

class RequestManagerTandem(RequestLoader):
    def __init__(
        self,
        *,
        request_list: RequestList,
        request_queue: RequestQueue
    ): ...

    @classmethod
    async def open(
        cls,
        *,
        request_list: RequestList,
        request_queue: RequestQueue
    ) -> RequestManagerTandem:
        """
        Create tandem manager with list and queue.

        Args:
            request_list: Static request list
            request_queue: Dynamic request queue

        Returns:
            RequestManagerTandem instance
        """

    async def add_request(
        self,
        request: str | Request,
        *,
        forefront: bool = False
    ) -> None:
        """Add request to queue (dynamic requests)."""

    async def add_requests_batched(
        self,
        requests: list[str | Request],
        *,
        forefront: bool = False
    ) -> None:
        """Add multiple requests to queue in batch."""

    @property
    def request_list(self) -> RequestList:
        """Access to request list component."""

    @property
    def request_queue(self) -> RequestQueue:
        """Access to request queue component."""

State Management

Request List State

State information for request list persistence and recovery.

class RequestListState:
    def __init__(
        self,
        *,
        next_index: int = 0,
        next_unique_key: str | None = None,
        in_progress: dict[str, Request] | None = None
    ): ...

    @property
    def next_index(self) -> int:
        """Index of next request to process."""

    @property
    def next_unique_key(self) -> str | None:
        """Unique key of next request."""

    @property
    def in_progress(self) -> dict[str, Request]:
        """Requests currently being processed."""

    def to_dict(self) -> dict[str, any]:
        """Serialize state to dictionary."""

    @classmethod
    def from_dict(cls, data: dict[str, any]) -> RequestListState:
        """Restore state from dictionary."""

Usage Examples

Static Request List

import asyncio
from crawlee.request_loaders import RequestList
from crawlee.crawlers import HttpCrawler, HttpCrawlingContext

async def main():
    # Create list of requests to process
    requests = [
        'https://example.com/page1',
        'https://example.com/page2',
        'https://example.com/page3',
        {'url': 'https://example.com/api', 'userData': {'type': 'api'}},
    ]

    # Create request list with persistence
    request_list = await RequestList.open(
        name='my-crawl',
        requests=requests,
        persist_state_key='crawl-state'
    )

    crawler = HttpCrawler()

    @crawler.router.default_handler
    async def handler(context: HttpCrawlingContext):
        print(f"Processing: {context.request.url}")

        data = {
            'url': context.request.url,
            'status': context.response.status_code,
            'user_data': context.request.user_data
        }

        await context.push_data(data)

    # Process requests from list
    while not await request_list.is_empty():
        request = await request_list.load_request()
        if request:
            try:
                # Process request with crawler
                await crawler._handle_request(request)
                await request_list.mark_request_handled(request)

            except Exception as e:
                print(f"Request failed: {e}")
                await request_list.reclaim_request(request)

    print(f"Processed {request_list.processed_count} requests")

asyncio.run(main())

Dynamic Request Manager

import asyncio
from crawlee.request_loaders import RequestManager
from crawlee.storages import RequestQueue
from crawlee.crawlers import HttpCrawler, HttpCrawlingContext

async def main():
    # Create request manager with queue
    queue = await RequestQueue.open('crawl-queue')

    manager = await RequestManager.open(
        name='dynamic-crawl',
        request_queue=queue
    )

    crawler = HttpCrawler()

    @crawler.router.default_handler
    async def handler(context: HttpCrawlingContext):
        print(f"Processing: {context.request.url}")

        # Extract links and add them dynamically
        # This is simplified - normally you'd parse HTML
        if 'page1' in context.request.url:
            await manager.add_requests([
                'https://example.com/page2',
                'https://example.com/page3'
            ])

        data = {
            'url': context.request.url,
            'status': context.response.status_code
        }

        await context.push_data(data)

    # Start with seed requests
    await manager.add_requests([
        'https://example.com/page1',
        'https://example.com/start'
    ])

    # Process requests dynamically
    while not await manager.is_finished():
        request = await manager.load_request()
        if request:
            try:
                await crawler._handle_request(request)
                await manager.mark_request_handled(request)

            except Exception as e:
                print(f"Request failed: {e}")
                await manager.reclaim_request(request)

        await asyncio.sleep(0.1)  # Prevent tight loop

    handled_count = await manager.get_handled_count()
    print(f"Processed {handled_count} requests")

    await manager.teardown()

asyncio.run(main())

Tandem Request Processing

import asyncio
from crawlee.request_loaders import RequestList, RequestManagerTandem
from crawlee.storages import RequestQueue
from crawlee.crawlers import HttpCrawler, HttpCrawlingContext

async def main():
    # Static list of seed requests
    seed_requests = [
        'https://example.com/',
        'https://example.com/products',
        'https://example.com/about'
    ]

    request_list = await RequestList.open(
        name='seed-requests',
        requests=seed_requests
    )

    # Dynamic queue for discovered requests
    request_queue = await RequestQueue.open('discovered-requests')

    # Create tandem manager
    tandem = await RequestManagerTandem.open(
        request_list=request_list,
        request_queue=request_queue
    )

    crawler = HttpCrawler()

    @crawler.router.default_handler
    async def handler(context: HttpCrawlingContext):
        url = context.request.url
        print(f"Processing: {url}")

        # Simulate link discovery and addition to queue
        if '/products' in url:
            # Add product pages to dynamic queue
            await tandem.add_requests_batched([
                f'https://example.com/product/{i}' for i in range(1, 6)
            ])

        elif '/product/' in url:
            # Add related products
            product_id = url.split('/')[-1]
            await tandem.add_request(f'https://example.com/product/{product_id}/reviews')

        data = {
            'url': url,
            'source': 'seed' if url in seed_requests else 'discovered',
            'status': context.response.status_code
        }

        await context.push_data(data)

    # Process both static and dynamic requests
    processed = 0
    while not await tandem.is_finished():
        request = await tandem.load_request()
        if request:
            try:
                await crawler._handle_request(request)
                await tandem.mark_request_handled(request)
                processed += 1

            except Exception as e:
                print(f"Request failed: {e}")
                await tandem.reclaim_request(request)

        if processed % 10 == 0:
            list_pending = tandem.request_list.pending_count
            queue_info = await tandem.request_queue.get_info()
            print(f"Processed: {processed}, List pending: {list_pending}, Queue pending: {queue_info.pending_request_count}")

    print(f"Total processed: {processed}")

asyncio.run(main())

Request List with State Persistence

import asyncio
from crawlee.request_loaders import RequestList
from crawlee.storages import KeyValueStore

async def simulate_crawl_interruption():
    """Simulate a crawl that gets interrupted and resumed."""

    print("=== Starting initial crawl ===")

    # Create request list with persistence
    requests = [f'https://example.com/page{i}' for i in range(1, 21)]

    request_list = await RequestList.open(
        name='persistent-crawl',
        requests=requests,
        persist_state_key='crawl-progress'
    )

    # Process some requests, then simulate interruption
    processed = 0
    target_before_interruption = 10

    while not await request_list.is_empty() and processed < target_before_interruption:
        request = await request_list.load_request()
        if request:
            print(f"Processing: {request.url}")
            await asyncio.sleep(0.1)  # Simulate work

            # Mark as handled
            await request_list.mark_request_handled(request)
            processed += 1

    print(f"Processed {processed} requests before interruption")
    print(f"Remaining: {request_list.pending_count}")

    # Persist state before "crash"
    await request_list.persist_state()
    print("State persisted")

    print("\n=== Simulating application restart ===")

    # Create new request list instance (simulating restart)
    new_request_list = await RequestList.open(
        name='persistent-crawl',
        requests=requests,  # Same requests as before
        persist_state_key='crawl-progress'  # Same persistence key
    )

    print(f"After restart - Processed: {new_request_list.processed_count}")
    print(f"After restart - Pending: {new_request_list.pending_count}")

    # Continue processing from where we left off
    while not await new_request_list.is_empty():
        request = await new_request_list.load_request()
        if request:
            print(f"Resuming: {request.url}")
            await asyncio.sleep(0.1)

            await new_request_list.mark_request_handled(request)
            processed += 1

    print(f"Total processed: {processed}")
    print("Crawl completed successfully after restart")

asyncio.run(simulate_crawl_interruption())

Custom Request Loader

import asyncio
from typing import Iterator
from crawlee.request_loaders import RequestLoader
from crawlee import Request

class PriorityRequestLoader(RequestLoader):
    """Custom request loader with priority queuing."""

    def __init__(self, requests: list[tuple[int, str | Request]]):
        """
        Initialize with priority-request tuples.

        Args:
            requests: List of (priority, request) tuples (lower number = higher priority)
        """
        import heapq

        self.heap = []
        self.handled = set()
        self.in_progress = {}
        self.total_count = len(requests)

        # Build priority heap
        for priority, request in requests:
            if isinstance(request, str):
                request = Request(request)

            heapq.heappush(self.heap, (priority, request.unique_key, request))

    async def get_total_count(self) -> int:
        return self.total_count

    async def is_finished(self) -> bool:
        return len(self.handled) == self.total_count

    async def is_empty(self) -> bool:
        return not self.heap and not self.in_progress

    async def load_request(self) -> Request | None:
        if not self.heap:
            return None

        import heapq
        priority, unique_key, request = heapq.heappop(self.heap)

        # Track as in progress
        self.in_progress[unique_key] = request

        return request

    async def mark_request_handled(self, request: Request) -> None:
        unique_key = request.unique_key

        if unique_key in self.in_progress:
            del self.in_progress[unique_key]
            self.handled.add(unique_key)

    async def reclaim_request(self, request: Request) -> None:
        unique_key = request.unique_key

        if unique_key in self.in_progress:
            # Return to heap with same priority (simplified - could implement retry logic)
            import heapq
            heapq.heappush(self.heap, (0, unique_key, request))  # High priority for retry
            del self.in_progress[unique_key]

async def main():
    # Create requests with priorities (lower number = higher priority)
    priority_requests = [
        (1, 'https://example.com/important'),    # High priority
        (5, 'https://example.com/page1'),        # Low priority
        (1, 'https://example.com/urgent'),       # High priority
        (3, 'https://example.com/page2'),        # Medium priority
        (5, 'https://example.com/page3'),        # Low priority
    ]

    loader = PriorityRequestLoader(priority_requests)

    print("Processing requests by priority:")
    while not await loader.is_finished():
        request = await loader.load_request()
        if request:
            print(f"Processing: {request.url}")

            # Simulate processing
            await asyncio.sleep(0.1)

            # Occasionally fail to test retry
            if 'page2' in request.url:
                print(f"  Failed: {request.url}")
                await loader.reclaim_request(request)
            else:
                print(f"  Completed: {request.url}")
                await loader.mark_request_handled(request)

    print("All requests processed")

asyncio.run(main())

Install with Tessl CLI

npx tessl i tessl/pypi-crawlee

docs

browser-automation.md

cli-tools.md

configuration.md

core-types.md

crawlers.md

error-handling.md

events.md

fingerprinting.md

http-clients.md

index.md

request-management.md

sessions.md

statistics.md

storage.md

tile.json