A comprehensive web scraping and browser automation library for Python with human-like behavior and bot protection bypass
Event-driven architecture for hooking into crawler lifecycle events and implementing custom behaviors. The events system provides a way to react to various crawler states and implement cross-cutting concerns like logging, monitoring, and custom workflows.
Abstract base class for event management systems that handle event emission and listener registration.
class EventManager:
async def emit(
self,
event_name: Event,
event_data: EventData | None = None
) -> None:
"""
Emit an event to all registered listeners.
Args:
event_name: Name/type of the event
event_data: Data associated with the event
"""
def on(
self,
event_name: Event,
listener: EventListener
) -> None:
"""
Register event listener for specific event.
Args:
event_name: Event to listen for
listener: Function to call when event occurs
"""
def off(
self,
event_name: Event,
listener: EventListener | None = None
) -> None:
"""
Remove event listener(s).
Args:
event_name: Event to stop listening for
listener: Specific listener to remove (None removes all)
"""
def once(
self,
event_name: Event,
listener: EventListener
) -> None:
"""
Register listener that only fires once.
Args:
event_name: Event to listen for once
listener: Function to call when event occurs
"""Local implementation of event manager for single-process event handling.
class LocalEventManager(EventManager):
def __init__(self): ...
def get_listener_count(self, event_name: Event) -> int:
"""Get number of listeners for specific event."""
def get_event_names(self) -> list[Event]:
"""Get list of all events with registered listeners."""
def clear(self) -> None:
"""Remove all event listeners."""Standard events emitted by crawlers during their lifecycle.
Event = Literal[
"system_info", # System resource information
"persist_state", # State persistence request
"migrating", # Data migration event
"aborting", # Crawler abort/stop event
"exit" # Crawler exit event
]Base event data containers for different types of events.
class EventData:
"""Base class for all event data."""
passclass EventSystemInfoData(EventData):
def __init__(
self,
*,
cpu_usage_percent: float,
memory_usage_bytes: int,
event_loop_delay_ms: float | None = None,
created_at: datetime | None = None
): ...
@property
def cpu_usage_percent(self) -> float:
"""Current CPU usage percentage."""
@property
def memory_usage_bytes(self) -> int:
"""Current memory usage in bytes."""
@property
def memory_usage_mb(self) -> float:
"""Current memory usage in megabytes."""
@property
def event_loop_delay_ms(self) -> float | None:
"""Event loop delay in milliseconds."""
@property
def created_at(self) -> datetime:
"""Timestamp when data was created."""class EventPersistStateData(EventData):
def __init__(
self,
*,
is_migrating: bool = False,
created_at: datetime | None = None
): ...
@property
def is_migrating(self) -> bool:
"""Whether persistence is due to migration."""
@property
def created_at(self) -> datetime:
"""Timestamp when persistence was requested."""class EventMigratingData(EventData):
def __init__(
self,
*,
reason: str | None = None,
created_at: datetime | None = None
): ...
@property
def reason(self) -> str | None:
"""Reason for migration."""
@property
def created_at(self) -> datetime:
"""Timestamp when migration started."""class EventAbortingData(EventData):
def __init__(
self,
*,
reason: str | None = None,
error: Exception | None = None,
created_at: datetime | None = None
): ...
@property
def reason(self) -> str | None:
"""Reason for aborting."""
@property
def error(self) -> Exception | None:
"""Exception that caused abort (if any)."""
@property
def created_at(self) -> datetime:
"""Timestamp when abort occurred."""class EventExitData(EventData):
def __init__(
self,
*,
exit_code: int = 0,
reason: str | None = None,
created_at: datetime | None = None
): ...
@property
def exit_code(self) -> int:
"""Exit code (0 for success)."""
@property
def reason(self) -> str | None:
"""Reason for exit."""
@property
def created_at(self) -> datetime:
"""Timestamp when exit occurred."""Type definition for event listener functions.
EventListener = Callable[[EventData | None], Awaitable[None] | None]import asyncio
from crawlee.crawlers import HttpCrawler, HttpCrawlingContext
from crawlee.events import LocalEventManager, EventSystemInfoData
async def main():
# Create event manager
event_manager = LocalEventManager()
# Register event listeners
@event_manager.on("system_info")
async def system_info_handler(data: EventSystemInfoData):
print(f"System Info - CPU: {data.cpu_usage_percent:.1f}%, Memory: {data.memory_usage_mb:.1f}MB")
@event_manager.on("persist_state")
async def persist_state_handler(data):
print("State persistence requested")
if data and data.is_migrating:
print(" Reason: Migration")
@event_manager.on("aborting")
async def aborting_handler(data):
print(f"Crawler aborting: {data.reason if data else 'Unknown reason'}")
if data and data.error:
print(f" Error: {data.error}")
# Create crawler with event manager
crawler = HttpCrawler(
event_manager=event_manager,
max_requests_per_crawl=10
)
@crawler.router.default_handler
async def handler(context: HttpCrawlingContext):
data = {
'url': context.request.url,
'status': context.response.status_code
}
await context.push_data(data)
# Run crawler - events will be emitted during execution
urls = ['https://httpbin.org/delay/1'] * 5
await crawler.run(urls)
asyncio.run(main())import asyncio
from datetime import datetime
from crawlee.events import LocalEventManager, EventData
class CustomEventData(EventData):
"""Custom event data for application-specific events."""
def __init__(self, message: str, data: dict = None):
self.message = message
self.data = data or {}
self.timestamp = datetime.now()
class CrawlerWithCustomEvents:
"""Crawler wrapper that emits custom events."""
def __init__(self):
self.event_manager = LocalEventManager()
self.stats = {
'requests_processed': 0,
'requests_failed': 0
}
async def emit_custom_event(self, event_name: str, message: str, data: dict = None):
"""Emit custom event with data."""
event_data = CustomEventData(message, data)
await self.event_manager.emit(event_name, event_data)
async def process_request(self, url: str):
"""Simulate request processing with events."""
try:
# Emit start event
await self.emit_custom_event(
"request_start",
f"Starting request to {url}",
{"url": url}
)
# Simulate processing
await asyncio.sleep(0.5)
# Emit progress events
await self.emit_custom_event(
"request_progress",
f"Downloaded content from {url}",
{"url": url, "step": "download"}
)
await asyncio.sleep(0.3)
await self.emit_custom_event(
"request_progress",
f"Parsing content from {url}",
{"url": url, "step": "parse"}
)
# Success
self.stats['requests_processed'] += 1
await self.emit_custom_event(
"request_complete",
f"Successfully processed {url}",
{"url": url, "status": "success"}
)
except Exception as e:
# Failure
self.stats['requests_failed'] += 1
await self.emit_custom_event(
"request_error",
f"Failed to process {url}: {e}",
{"url": url, "error": str(e)}
)
async def run(self, urls: list[str]):
"""Run crawler with event emission."""
await self.emit_custom_event(
"crawl_start",
f"Starting crawl with {len(urls)} URLs",
{"url_count": len(urls)}
)
for url in urls:
await self.process_request(url)
await self.emit_custom_event(
"crawl_complete",
"Crawl completed",
{
"total_urls": len(urls),
"processed": self.stats['requests_processed'],
"failed": self.stats['requests_failed']
}
)
async def main():
crawler = CrawlerWithCustomEvents()
# Register event listeners
@crawler.event_manager.on("crawl_start")
async def crawl_start_handler(data: CustomEventData):
print(f"🚀 {data.message}")
print(f" URLs to process: {data.data['url_count']}")
@crawler.event_manager.on("request_start")
async def request_start_handler(data: CustomEventData):
print(f"📥 {data.message}")
@crawler.event_manager.on("request_progress")
async def request_progress_handler(data: CustomEventData):
step = data.data.get('step', 'unknown')
url = data.data.get('url', 'unknown')
print(f"⚙️ {step.capitalize()}: {url}")
@crawler.event_manager.on("request_complete")
async def request_complete_handler(data: CustomEventData):
print(f"✅ {data.message}")
@crawler.event_manager.on("request_error")
async def request_error_handler(data: CustomEventData):
print(f"❌ {data.message}")
@crawler.event_manager.on("crawl_complete")
async def crawl_complete_handler(data: CustomEventData):
print(f"🎉 {data.message}")
print(f" Processed: {data.data['processed']}")
print(f" Failed: {data.data['failed']}")
# Run crawler
urls = [
'https://example.com/page1',
'https://example.com/page2',
'https://example.com/page3'
]
await crawler.run(urls)
asyncio.run(main())import asyncio
import time
from datetime import datetime, timedelta
from crawlee.crawlers import HttpCrawler, HttpCrawlingContext
from crawlee.events import LocalEventManager, EventSystemInfoData
class CrawlerMonitor:
"""Monitor crawler performance using events."""
def __init__(self):
self.start_time = None
self.request_times = []
self.system_snapshots = []
self.error_count = 0
self.success_count = 0
def setup_monitoring(self, event_manager: LocalEventManager):
"""Setup event listeners for monitoring."""
@event_manager.on("system_info")
async def monitor_system(data: EventSystemInfoData):
self.system_snapshots.append({
'timestamp': data.created_at,
'cpu_percent': data.cpu_usage_percent,
'memory_mb': data.memory_usage_mb,
'event_loop_delay': data.event_loop_delay_ms
})
@event_manager.on("persist_state")
async def monitor_persistence(data):
print(f"💾 State persistence at {datetime.now()}")
@event_manager.on("aborting")
async def monitor_abort(data):
print(f"🛑 Crawler aborting: {data.reason if data else 'Unknown'}")
await self.generate_report()
async def start_monitoring(self):
"""Start monitoring session."""
self.start_time = datetime.now()
print(f"📊 Monitoring started at {self.start_time}")
async def record_request_start(self, url: str):
"""Record request start time."""
return time.time()
async def record_request_end(self, start_time: float, success: bool):
"""Record request completion."""
duration = (time.time() - start_time) * 1000 # Convert to ms
self.request_times.append(duration)
if success:
self.success_count += 1
else:
self.error_count += 1
async def generate_report(self):
"""Generate monitoring report."""
if not self.start_time:
return
duration = datetime.now() - self.start_time
print(f"\n📊 Crawler Monitoring Report")
print(f"=" * 40)
print(f"Total Duration: {duration}")
print(f"Requests: {self.success_count + self.error_count}")
print(f" Success: {self.success_count}")
print(f" Errors: {self.error_count}")
if self.request_times:
avg_time = sum(self.request_times) / len(self.request_times)
min_time = min(self.request_times)
max_time = max(self.request_times)
print(f"Request Times:")
print(f" Average: {avg_time:.2f}ms")
print(f" Min: {min_time:.2f}ms")
print(f" Max: {max_time:.2f}ms")
if self.system_snapshots:
avg_cpu = sum(s['cpu_percent'] for s in self.system_snapshots) / len(self.system_snapshots)
avg_memory = sum(s['memory_mb'] for s in self.system_snapshots) / len(self.system_snapshots)
print(f"System Resources:")
print(f" Average CPU: {avg_cpu:.1f}%")
print(f" Average Memory: {avg_memory:.1f}MB")
print(f" Snapshots: {len(self.system_snapshots)}")
async def main():
monitor = CrawlerMonitor()
event_manager = LocalEventManager()
# Setup monitoring
monitor.setup_monitoring(event_manager)
crawler = HttpCrawler(
event_manager=event_manager,
max_requests_per_crawl=20
)
@crawler.router.default_handler
async def handler(context: HttpCrawlingContext):
start_time = await monitor.record_request_start(context.request.url)
try:
# Simulate processing
await asyncio.sleep(0.5)
data = {
'url': context.request.url,
'status': context.response.status_code,
'timestamp': datetime.now().isoformat()
}
await context.push_data(data)
await monitor.record_request_end(start_time, success=True)
except Exception:
await monitor.record_request_end(start_time, success=False)
raise
await monitor.start_monitoring()
# Start crawling
urls = [f'https://httpbin.org/delay/{i%3+1}' for i in range(15)]
await crawler.run(urls)
# Generate final report
await monitor.generate_report()
asyncio.run(main())import asyncio
from crawlee.events import LocalEventManager, EventData
class WorkflowEventData(EventData):
"""Event data for workflow control."""
def __init__(self, step: str, data: dict = None):
self.step = step
self.data = data or {}
self.timestamp = datetime.now()
class WorkflowController:
"""Control crawler workflow using events."""
def __init__(self):
self.event_manager = LocalEventManager()
self.current_step = "idle"
self.workflow_data = {}
self.should_pause = False
self.should_stop = False
async def emit_workflow_event(self, step: str, data: dict = None):
"""Emit workflow event."""
event_data = WorkflowEventData(step, data)
await self.event_manager.emit("workflow", event_data)
def setup_workflow_control(self):
"""Setup workflow event handlers."""
@self.event_manager.on("workflow")
async def workflow_handler(data: WorkflowEventData):
self.current_step = data.step
if data.step == "pause_requested":
self.should_pause = True
print("⏸️ Workflow pause requested")
elif data.step == "resume_requested":
self.should_pause = False
print("▶️ Workflow resume requested")
elif data.step == "stop_requested":
self.should_stop = True
print("🛑 Workflow stop requested")
elif data.step == "step_complete":
step_name = data.data.get('name', 'unknown')
print(f"✅ Step completed: {step_name}")
elif data.step == "error_occurred":
error_msg = data.data.get('error', 'Unknown error')
print(f"❌ Workflow error: {error_msg}")
async def wait_for_resume(self):
"""Wait while paused."""
while self.should_pause and not self.should_stop:
print("⏸️ Workflow paused, waiting for resume...")
await asyncio.sleep(1)
async def check_workflow_control(self):
"""Check if workflow should continue."""
if self.should_stop:
await self.emit_workflow_event("workflow_stopped")
return False
if self.should_pause:
await self.wait_for_resume()
return True
async def execute_step(self, step_name: str, step_func, *args, **kwargs):
"""Execute workflow step with control checks."""
if not await self.check_workflow_control():
return False
try:
await self.emit_workflow_event("step_start", {"name": step_name})
result = await step_func(*args, **kwargs)
await self.emit_workflow_event("step_complete", {
"name": step_name,
"result": result
})
return True
except Exception as e:
await self.emit_workflow_event("error_occurred", {
"step": step_name,
"error": str(e)
})
return False
async def run_workflow(self, steps: list):
"""Run workflow with event-based control."""
await self.emit_workflow_event("workflow_start", {"steps": len(steps)})
for i, (step_name, step_func, args, kwargs) in enumerate(steps):
success = await self.execute_step(step_name, step_func, *args, **kwargs)
if not success:
await self.emit_workflow_event("workflow_failed", {"failed_step": step_name})
return False
await self.emit_workflow_event("workflow_complete")
return True
async def main():
controller = WorkflowController()
controller.setup_workflow_control()
# Example workflow steps
async def fetch_data(url: str):
print(f"📥 Fetching data from {url}")
await asyncio.sleep(1)
return f"data_from_{url.split('//')[-1]}"
async def process_data(data: str):
print(f"⚙️ Processing {data}")
await asyncio.sleep(0.5)
return f"processed_{data}"
async def save_results(data: str):
print(f"💾 Saving {data}")
await asyncio.sleep(0.3)
return "saved"
# Define workflow
workflow_steps = [
("fetch_data", fetch_data, ("https://example.com/api",), {}),
("process_data", process_data, ("raw_data",), {}),
("save_results", save_results, ("processed_data",), {})
]
# Start workflow
workflow_task = asyncio.create_task(
controller.run_workflow(workflow_steps)
)
# Simulate user control after 2 seconds
await asyncio.sleep(2)
await controller.emit_workflow_event("pause_requested")
# Resume after 3 seconds
await asyncio.sleep(3)
await controller.emit_workflow_event("resume_requested")
# Wait for workflow completion
success = await workflow_task
if success:
print("🎉 Workflow completed successfully")
else:
print("💥 Workflow failed")
from datetime import datetime
asyncio.run(main())Install with Tessl CLI
npx tessl i tessl/pypi-crawlee