CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-crawlee

A comprehensive web scraping and browser automation library for Python with human-like behavior and bot protection bypass

Overview
Eval results
Files

events.mddocs/

Events

Event-driven architecture for hooking into crawler lifecycle events and implementing custom behaviors. The events system provides a way to react to various crawler states and implement cross-cutting concerns like logging, monitoring, and custom workflows.

Capabilities

Event Manager

Abstract base class for event management systems that handle event emission and listener registration.

class EventManager:
    async def emit(
        self,
        event_name: Event,
        event_data: EventData | None = None
    ) -> None:
        """
        Emit an event to all registered listeners.

        Args:
            event_name: Name/type of the event
            event_data: Data associated with the event
        """

    def on(
        self,
        event_name: Event,
        listener: EventListener
    ) -> None:
        """
        Register event listener for specific event.

        Args:
            event_name: Event to listen for
            listener: Function to call when event occurs
        """

    def off(
        self,
        event_name: Event,
        listener: EventListener | None = None
    ) -> None:
        """
        Remove event listener(s).

        Args:
            event_name: Event to stop listening for
            listener: Specific listener to remove (None removes all)
        """

    def once(
        self,
        event_name: Event,
        listener: EventListener
    ) -> None:
        """
        Register listener that only fires once.

        Args:
            event_name: Event to listen for once
            listener: Function to call when event occurs
        """

Local Event Manager

Local implementation of event manager for single-process event handling.

class LocalEventManager(EventManager):
    def __init__(self): ...

    def get_listener_count(self, event_name: Event) -> int:
        """Get number of listeners for specific event."""

    def get_event_names(self) -> list[Event]:
        """Get list of all events with registered listeners."""

    def clear(self) -> None:
        """Remove all event listeners."""

Event Types

Core Events

Standard events emitted by crawlers during their lifecycle.

Event = Literal[
    "system_info",           # System resource information
    "persist_state",         # State persistence request
    "migrating",            # Data migration event
    "aborting",             # Crawler abort/stop event
    "exit"                  # Crawler exit event
]

Event Data Types

Base event data containers for different types of events.

class EventData:
    """Base class for all event data."""
    pass
class EventSystemInfoData(EventData):
    def __init__(
        self,
        *,
        cpu_usage_percent: float,
        memory_usage_bytes: int,
        event_loop_delay_ms: float | None = None,
        created_at: datetime | None = None
    ): ...

    @property
    def cpu_usage_percent(self) -> float:
        """Current CPU usage percentage."""

    @property
    def memory_usage_bytes(self) -> int:
        """Current memory usage in bytes."""

    @property
    def memory_usage_mb(self) -> float:
        """Current memory usage in megabytes."""

    @property
    def event_loop_delay_ms(self) -> float | None:
        """Event loop delay in milliseconds."""

    @property
    def created_at(self) -> datetime:
        """Timestamp when data was created."""
class EventPersistStateData(EventData):
    def __init__(
        self,
        *,
        is_migrating: bool = False,
        created_at: datetime | None = None
    ): ...

    @property
    def is_migrating(self) -> bool:
        """Whether persistence is due to migration."""

    @property
    def created_at(self) -> datetime:
        """Timestamp when persistence was requested."""
class EventMigratingData(EventData):
    def __init__(
        self,
        *,
        reason: str | None = None,
        created_at: datetime | None = None
    ): ...

    @property
    def reason(self) -> str | None:
        """Reason for migration."""

    @property
    def created_at(self) -> datetime:
        """Timestamp when migration started."""
class EventAbortingData(EventData):
    def __init__(
        self,
        *,
        reason: str | None = None,
        error: Exception | None = None,
        created_at: datetime | None = None
    ): ...

    @property
    def reason(self) -> str | None:
        """Reason for aborting."""

    @property
    def error(self) -> Exception | None:
        """Exception that caused abort (if any)."""

    @property
    def created_at(self) -> datetime:
        """Timestamp when abort occurred."""
class EventExitData(EventData):
    def __init__(
        self,
        *,
        exit_code: int = 0,
        reason: str | None = None,
        created_at: datetime | None = None
    ): ...

    @property
    def exit_code(self) -> int:
        """Exit code (0 for success)."""

    @property
    def reason(self) -> str | None:
        """Reason for exit."""

    @property
    def created_at(self) -> datetime:
        """Timestamp when exit occurred."""

Event Listener Type

Type definition for event listener functions.

EventListener = Callable[[EventData | None], Awaitable[None] | None]

Usage Examples

Basic Event Handling

import asyncio
from crawlee.crawlers import HttpCrawler, HttpCrawlingContext
from crawlee.events import LocalEventManager, EventSystemInfoData

async def main():
    # Create event manager
    event_manager = LocalEventManager()

    # Register event listeners
    @event_manager.on("system_info")
    async def system_info_handler(data: EventSystemInfoData):
        print(f"System Info - CPU: {data.cpu_usage_percent:.1f}%, Memory: {data.memory_usage_mb:.1f}MB")

    @event_manager.on("persist_state")
    async def persist_state_handler(data):
        print("State persistence requested")
        if data and data.is_migrating:
            print("  Reason: Migration")

    @event_manager.on("aborting")
    async def aborting_handler(data):
        print(f"Crawler aborting: {data.reason if data else 'Unknown reason'}")
        if data and data.error:
            print(f"  Error: {data.error}")

    # Create crawler with event manager
    crawler = HttpCrawler(
        event_manager=event_manager,
        max_requests_per_crawl=10
    )

    @crawler.router.default_handler
    async def handler(context: HttpCrawlingContext):
        data = {
            'url': context.request.url,
            'status': context.response.status_code
        }
        await context.push_data(data)

    # Run crawler - events will be emitted during execution
    urls = ['https://httpbin.org/delay/1'] * 5
    await crawler.run(urls)

asyncio.run(main())

Custom Event System

import asyncio
from datetime import datetime
from crawlee.events import LocalEventManager, EventData

class CustomEventData(EventData):
    """Custom event data for application-specific events."""

    def __init__(self, message: str, data: dict = None):
        self.message = message
        self.data = data or {}
        self.timestamp = datetime.now()

class CrawlerWithCustomEvents:
    """Crawler wrapper that emits custom events."""

    def __init__(self):
        self.event_manager = LocalEventManager()
        self.stats = {
            'requests_processed': 0,
            'requests_failed': 0
        }

    async def emit_custom_event(self, event_name: str, message: str, data: dict = None):
        """Emit custom event with data."""
        event_data = CustomEventData(message, data)
        await self.event_manager.emit(event_name, event_data)

    async def process_request(self, url: str):
        """Simulate request processing with events."""
        try:
            # Emit start event
            await self.emit_custom_event(
                "request_start",
                f"Starting request to {url}",
                {"url": url}
            )

            # Simulate processing
            await asyncio.sleep(0.5)

            # Emit progress events
            await self.emit_custom_event(
                "request_progress",
                f"Downloaded content from {url}",
                {"url": url, "step": "download"}
            )

            await asyncio.sleep(0.3)

            await self.emit_custom_event(
                "request_progress",
                f"Parsing content from {url}",
                {"url": url, "step": "parse"}
            )

            # Success
            self.stats['requests_processed'] += 1
            await self.emit_custom_event(
                "request_complete",
                f"Successfully processed {url}",
                {"url": url, "status": "success"}
            )

        except Exception as e:
            # Failure
            self.stats['requests_failed'] += 1
            await self.emit_custom_event(
                "request_error",
                f"Failed to process {url}: {e}",
                {"url": url, "error": str(e)}
            )

    async def run(self, urls: list[str]):
        """Run crawler with event emission."""
        await self.emit_custom_event(
            "crawl_start",
            f"Starting crawl with {len(urls)} URLs",
            {"url_count": len(urls)}
        )

        for url in urls:
            await self.process_request(url)

        await self.emit_custom_event(
            "crawl_complete",
            "Crawl completed",
            {
                "total_urls": len(urls),
                "processed": self.stats['requests_processed'],
                "failed": self.stats['requests_failed']
            }
        )

async def main():
    crawler = CrawlerWithCustomEvents()

    # Register event listeners
    @crawler.event_manager.on("crawl_start")
    async def crawl_start_handler(data: CustomEventData):
        print(f"🚀 {data.message}")
        print(f"   URLs to process: {data.data['url_count']}")

    @crawler.event_manager.on("request_start")
    async def request_start_handler(data: CustomEventData):
        print(f"📥 {data.message}")

    @crawler.event_manager.on("request_progress")
    async def request_progress_handler(data: CustomEventData):
        step = data.data.get('step', 'unknown')
        url = data.data.get('url', 'unknown')
        print(f"⚙️  {step.capitalize()}: {url}")

    @crawler.event_manager.on("request_complete")
    async def request_complete_handler(data: CustomEventData):
        print(f"✅ {data.message}")

    @crawler.event_manager.on("request_error")
    async def request_error_handler(data: CustomEventData):
        print(f"❌ {data.message}")

    @crawler.event_manager.on("crawl_complete")
    async def crawl_complete_handler(data: CustomEventData):
        print(f"🎉 {data.message}")
        print(f"   Processed: {data.data['processed']}")
        print(f"   Failed: {data.data['failed']}")

    # Run crawler
    urls = [
        'https://example.com/page1',
        'https://example.com/page2',
        'https://example.com/page3'
    ]

    await crawler.run(urls)

asyncio.run(main())

Event-Based Monitoring

import asyncio
import time
from datetime import datetime, timedelta
from crawlee.crawlers import HttpCrawler, HttpCrawlingContext
from crawlee.events import LocalEventManager, EventSystemInfoData

class CrawlerMonitor:
    """Monitor crawler performance using events."""

    def __init__(self):
        self.start_time = None
        self.request_times = []
        self.system_snapshots = []
        self.error_count = 0
        self.success_count = 0

    def setup_monitoring(self, event_manager: LocalEventManager):
        """Setup event listeners for monitoring."""

        @event_manager.on("system_info")
        async def monitor_system(data: EventSystemInfoData):
            self.system_snapshots.append({
                'timestamp': data.created_at,
                'cpu_percent': data.cpu_usage_percent,
                'memory_mb': data.memory_usage_mb,
                'event_loop_delay': data.event_loop_delay_ms
            })

        @event_manager.on("persist_state")
        async def monitor_persistence(data):
            print(f"💾 State persistence at {datetime.now()}")

        @event_manager.on("aborting")
        async def monitor_abort(data):
            print(f"🛑 Crawler aborting: {data.reason if data else 'Unknown'}")
            await self.generate_report()

    async def start_monitoring(self):
        """Start monitoring session."""
        self.start_time = datetime.now()
        print(f"📊 Monitoring started at {self.start_time}")

    async def record_request_start(self, url: str):
        """Record request start time."""
        return time.time()

    async def record_request_end(self, start_time: float, success: bool):
        """Record request completion."""
        duration = (time.time() - start_time) * 1000  # Convert to ms
        self.request_times.append(duration)

        if success:
            self.success_count += 1
        else:
            self.error_count += 1

    async def generate_report(self):
        """Generate monitoring report."""
        if not self.start_time:
            return

        duration = datetime.now() - self.start_time

        print(f"\n📊 Crawler Monitoring Report")
        print(f"=" * 40)
        print(f"Total Duration: {duration}")
        print(f"Requests: {self.success_count + self.error_count}")
        print(f"  Success: {self.success_count}")
        print(f"  Errors: {self.error_count}")

        if self.request_times:
            avg_time = sum(self.request_times) / len(self.request_times)
            min_time = min(self.request_times)
            max_time = max(self.request_times)

            print(f"Request Times:")
            print(f"  Average: {avg_time:.2f}ms")
            print(f"  Min: {min_time:.2f}ms")
            print(f"  Max: {max_time:.2f}ms")

        if self.system_snapshots:
            avg_cpu = sum(s['cpu_percent'] for s in self.system_snapshots) / len(self.system_snapshots)
            avg_memory = sum(s['memory_mb'] for s in self.system_snapshots) / len(self.system_snapshots)

            print(f"System Resources:")
            print(f"  Average CPU: {avg_cpu:.1f}%")
            print(f"  Average Memory: {avg_memory:.1f}MB")
            print(f"  Snapshots: {len(self.system_snapshots)}")

async def main():
    monitor = CrawlerMonitor()
    event_manager = LocalEventManager()

    # Setup monitoring
    monitor.setup_monitoring(event_manager)

    crawler = HttpCrawler(
        event_manager=event_manager,
        max_requests_per_crawl=20
    )

    @crawler.router.default_handler
    async def handler(context: HttpCrawlingContext):
        start_time = await monitor.record_request_start(context.request.url)

        try:
            # Simulate processing
            await asyncio.sleep(0.5)

            data = {
                'url': context.request.url,
                'status': context.response.status_code,
                'timestamp': datetime.now().isoformat()
            }

            await context.push_data(data)
            await monitor.record_request_end(start_time, success=True)

        except Exception:
            await monitor.record_request_end(start_time, success=False)
            raise

    await monitor.start_monitoring()

    # Start crawling
    urls = [f'https://httpbin.org/delay/{i%3+1}' for i in range(15)]
    await crawler.run(urls)

    # Generate final report
    await monitor.generate_report()

asyncio.run(main())

Event-Based Workflow Control

import asyncio
from crawlee.events import LocalEventManager, EventData

class WorkflowEventData(EventData):
    """Event data for workflow control."""

    def __init__(self, step: str, data: dict = None):
        self.step = step
        self.data = data or {}
        self.timestamp = datetime.now()

class WorkflowController:
    """Control crawler workflow using events."""

    def __init__(self):
        self.event_manager = LocalEventManager()
        self.current_step = "idle"
        self.workflow_data = {}
        self.should_pause = False
        self.should_stop = False

    async def emit_workflow_event(self, step: str, data: dict = None):
        """Emit workflow event."""
        event_data = WorkflowEventData(step, data)
        await self.event_manager.emit("workflow", event_data)

    def setup_workflow_control(self):
        """Setup workflow event handlers."""

        @self.event_manager.on("workflow")
        async def workflow_handler(data: WorkflowEventData):
            self.current_step = data.step

            if data.step == "pause_requested":
                self.should_pause = True
                print("⏸️  Workflow pause requested")

            elif data.step == "resume_requested":
                self.should_pause = False
                print("▶️  Workflow resume requested")

            elif data.step == "stop_requested":
                self.should_stop = True
                print("🛑 Workflow stop requested")

            elif data.step == "step_complete":
                step_name = data.data.get('name', 'unknown')
                print(f"✅ Step completed: {step_name}")

            elif data.step == "error_occurred":
                error_msg = data.data.get('error', 'Unknown error')
                print(f"❌ Workflow error: {error_msg}")

    async def wait_for_resume(self):
        """Wait while paused."""
        while self.should_pause and not self.should_stop:
            print("⏸️  Workflow paused, waiting for resume...")
            await asyncio.sleep(1)

    async def check_workflow_control(self):
        """Check if workflow should continue."""
        if self.should_stop:
            await self.emit_workflow_event("workflow_stopped")
            return False

        if self.should_pause:
            await self.wait_for_resume()

        return True

    async def execute_step(self, step_name: str, step_func, *args, **kwargs):
        """Execute workflow step with control checks."""
        if not await self.check_workflow_control():
            return False

        try:
            await self.emit_workflow_event("step_start", {"name": step_name})

            result = await step_func(*args, **kwargs)

            await self.emit_workflow_event("step_complete", {
                "name": step_name,
                "result": result
            })

            return True

        except Exception as e:
            await self.emit_workflow_event("error_occurred", {
                "step": step_name,
                "error": str(e)
            })
            return False

    async def run_workflow(self, steps: list):
        """Run workflow with event-based control."""
        await self.emit_workflow_event("workflow_start", {"steps": len(steps)})

        for i, (step_name, step_func, args, kwargs) in enumerate(steps):
            success = await self.execute_step(step_name, step_func, *args, **kwargs)

            if not success:
                await self.emit_workflow_event("workflow_failed", {"failed_step": step_name})
                return False

        await self.emit_workflow_event("workflow_complete")
        return True

async def main():
    controller = WorkflowController()
    controller.setup_workflow_control()

    # Example workflow steps
    async def fetch_data(url: str):
        print(f"📥 Fetching data from {url}")
        await asyncio.sleep(1)
        return f"data_from_{url.split('//')[-1]}"

    async def process_data(data: str):
        print(f"⚙️  Processing {data}")
        await asyncio.sleep(0.5)
        return f"processed_{data}"

    async def save_results(data: str):
        print(f"💾 Saving {data}")
        await asyncio.sleep(0.3)
        return "saved"

    # Define workflow
    workflow_steps = [
        ("fetch_data", fetch_data, ("https://example.com/api",), {}),
        ("process_data", process_data, ("raw_data",), {}),
        ("save_results", save_results, ("processed_data",), {})
    ]

    # Start workflow
    workflow_task = asyncio.create_task(
        controller.run_workflow(workflow_steps)
    )

    # Simulate user control after 2 seconds
    await asyncio.sleep(2)
    await controller.emit_workflow_event("pause_requested")

    # Resume after 3 seconds
    await asyncio.sleep(3)
    await controller.emit_workflow_event("resume_requested")

    # Wait for workflow completion
    success = await workflow_task

    if success:
        print("🎉 Workflow completed successfully")
    else:
        print("💥 Workflow failed")

from datetime import datetime
asyncio.run(main())

Install with Tessl CLI

npx tessl i tessl/pypi-crawlee

docs

browser-automation.md

cli-tools.md

configuration.md

core-types.md

crawlers.md

error-handling.md

events.md

fingerprinting.md

http-clients.md

index.md

request-management.md

sessions.md

statistics.md

storage.md

tile.json