0
# Events
1
2
Event-driven architecture for hooking into crawler lifecycle events and implementing custom behaviors. The events system provides a way to react to various crawler states and implement cross-cutting concerns like logging, monitoring, and custom workflows.
3
4
## Capabilities
5
6
### Event Manager
7
8
Abstract base class for event management systems that handle event emission and listener registration.
9
10
```python { .api }
11
class EventManager:
12
async def emit(
13
self,
14
event_name: Event,
15
event_data: EventData | None = None
16
) -> None:
17
"""
18
Emit an event to all registered listeners.
19
20
Args:
21
event_name: Name/type of the event
22
event_data: Data associated with the event
23
"""
24
25
def on(
26
self,
27
event_name: Event,
28
listener: EventListener
29
) -> None:
30
"""
31
Register event listener for specific event.
32
33
Args:
34
event_name: Event to listen for
35
listener: Function to call when event occurs
36
"""
37
38
def off(
39
self,
40
event_name: Event,
41
listener: EventListener | None = None
42
) -> None:
43
"""
44
Remove event listener(s).
45
46
Args:
47
event_name: Event to stop listening for
48
listener: Specific listener to remove (None removes all)
49
"""
50
51
def once(
52
self,
53
event_name: Event,
54
listener: EventListener
55
) -> None:
56
"""
57
Register listener that only fires once.
58
59
Args:
60
event_name: Event to listen for once
61
listener: Function to call when event occurs
62
"""
63
```
64
65
### Local Event Manager
66
67
Local implementation of event manager for single-process event handling.
68
69
```python { .api }
70
class LocalEventManager(EventManager):
71
def __init__(self): ...
72
73
def get_listener_count(self, event_name: Event) -> int:
74
"""Get number of listeners for specific event."""
75
76
def get_event_names(self) -> list[Event]:
77
"""Get list of all events with registered listeners."""
78
79
def clear(self) -> None:
80
"""Remove all event listeners."""
81
```
82
83
## Event Types
84
85
### Core Events
86
87
Standard events emitted by crawlers during their lifecycle.
88
89
```python { .api }
90
Event = Literal[
91
"system_info", # System resource information
92
"persist_state", # State persistence request
93
"migrating", # Data migration event
94
"aborting", # Crawler abort/stop event
95
"exit" # Crawler exit event
96
]
97
```
98
99
### Event Data Types
100
101
Base event data containers for different types of events.
102
103
```python { .api }
104
class EventData:
105
"""Base class for all event data."""
106
pass
107
```
108
109
```python { .api }
110
class EventSystemInfoData(EventData):
111
def __init__(
112
self,
113
*,
114
cpu_usage_percent: float,
115
memory_usage_bytes: int,
116
event_loop_delay_ms: float | None = None,
117
created_at: datetime | None = None
118
): ...
119
120
@property
121
def cpu_usage_percent(self) -> float:
122
"""Current CPU usage percentage."""
123
124
@property
125
def memory_usage_bytes(self) -> int:
126
"""Current memory usage in bytes."""
127
128
@property
129
def memory_usage_mb(self) -> float:
130
"""Current memory usage in megabytes."""
131
132
@property
133
def event_loop_delay_ms(self) -> float | None:
134
"""Event loop delay in milliseconds."""
135
136
@property
137
def created_at(self) -> datetime:
138
"""Timestamp when data was created."""
139
```
140
141
```python { .api }
142
class EventPersistStateData(EventData):
143
def __init__(
144
self,
145
*,
146
is_migrating: bool = False,
147
created_at: datetime | None = None
148
): ...
149
150
@property
151
def is_migrating(self) -> bool:
152
"""Whether persistence is due to migration."""
153
154
@property
155
def created_at(self) -> datetime:
156
"""Timestamp when persistence was requested."""
157
```
158
159
```python { .api }
160
class EventMigratingData(EventData):
161
def __init__(
162
self,
163
*,
164
reason: str | None = None,
165
created_at: datetime | None = None
166
): ...
167
168
@property
169
def reason(self) -> str | None:
170
"""Reason for migration."""
171
172
@property
173
def created_at(self) -> datetime:
174
"""Timestamp when migration started."""
175
```
176
177
```python { .api }
178
class EventAbortingData(EventData):
179
def __init__(
180
self,
181
*,
182
reason: str | None = None,
183
error: Exception | None = None,
184
created_at: datetime | None = None
185
): ...
186
187
@property
188
def reason(self) -> str | None:
189
"""Reason for aborting."""
190
191
@property
192
def error(self) -> Exception | None:
193
"""Exception that caused abort (if any)."""
194
195
@property
196
def created_at(self) -> datetime:
197
"""Timestamp when abort occurred."""
198
```
199
200
```python { .api }
201
class EventExitData(EventData):
202
def __init__(
203
self,
204
*,
205
exit_code: int = 0,
206
reason: str | None = None,
207
created_at: datetime | None = None
208
): ...
209
210
@property
211
def exit_code(self) -> int:
212
"""Exit code (0 for success)."""
213
214
@property
215
def reason(self) -> str | None:
216
"""Reason for exit."""
217
218
@property
219
def created_at(self) -> datetime:
220
"""Timestamp when exit occurred."""
221
```
222
223
### Event Listener Type
224
225
Type definition for event listener functions.
226
227
```python { .api }
228
EventListener = Callable[[EventData | None], Awaitable[None] | None]
229
```
230
231
## Usage Examples
232
233
### Basic Event Handling
234
235
```python
236
import asyncio
237
from crawlee.crawlers import HttpCrawler, HttpCrawlingContext
238
from crawlee.events import LocalEventManager, EventSystemInfoData
239
240
async def main():
241
# Create event manager
242
event_manager = LocalEventManager()
243
244
# Register event listeners
245
@event_manager.on("system_info")
246
async def system_info_handler(data: EventSystemInfoData):
247
print(f"System Info - CPU: {data.cpu_usage_percent:.1f}%, Memory: {data.memory_usage_mb:.1f}MB")
248
249
@event_manager.on("persist_state")
250
async def persist_state_handler(data):
251
print("State persistence requested")
252
if data and data.is_migrating:
253
print(" Reason: Migration")
254
255
@event_manager.on("aborting")
256
async def aborting_handler(data):
257
print(f"Crawler aborting: {data.reason if data else 'Unknown reason'}")
258
if data and data.error:
259
print(f" Error: {data.error}")
260
261
# Create crawler with event manager
262
crawler = HttpCrawler(
263
event_manager=event_manager,
264
max_requests_per_crawl=10
265
)
266
267
@crawler.router.default_handler
268
async def handler(context: HttpCrawlingContext):
269
data = {
270
'url': context.request.url,
271
'status': context.response.status_code
272
}
273
await context.push_data(data)
274
275
# Run crawler - events will be emitted during execution
276
urls = ['https://httpbin.org/delay/1'] * 5
277
await crawler.run(urls)
278
279
asyncio.run(main())
280
```
281
282
### Custom Event System
283
284
```python
285
import asyncio
286
from datetime import datetime
287
from crawlee.events import LocalEventManager, EventData
288
289
class CustomEventData(EventData):
290
"""Custom event data for application-specific events."""
291
292
def __init__(self, message: str, data: dict = None):
293
self.message = message
294
self.data = data or {}
295
self.timestamp = datetime.now()
296
297
class CrawlerWithCustomEvents:
298
"""Crawler wrapper that emits custom events."""
299
300
def __init__(self):
301
self.event_manager = LocalEventManager()
302
self.stats = {
303
'requests_processed': 0,
304
'requests_failed': 0
305
}
306
307
async def emit_custom_event(self, event_name: str, message: str, data: dict = None):
308
"""Emit custom event with data."""
309
event_data = CustomEventData(message, data)
310
await self.event_manager.emit(event_name, event_data)
311
312
async def process_request(self, url: str):
313
"""Simulate request processing with events."""
314
try:
315
# Emit start event
316
await self.emit_custom_event(
317
"request_start",
318
f"Starting request to {url}",
319
{"url": url}
320
)
321
322
# Simulate processing
323
await asyncio.sleep(0.5)
324
325
# Emit progress events
326
await self.emit_custom_event(
327
"request_progress",
328
f"Downloaded content from {url}",
329
{"url": url, "step": "download"}
330
)
331
332
await asyncio.sleep(0.3)
333
334
await self.emit_custom_event(
335
"request_progress",
336
f"Parsing content from {url}",
337
{"url": url, "step": "parse"}
338
)
339
340
# Success
341
self.stats['requests_processed'] += 1
342
await self.emit_custom_event(
343
"request_complete",
344
f"Successfully processed {url}",
345
{"url": url, "status": "success"}
346
)
347
348
except Exception as e:
349
# Failure
350
self.stats['requests_failed'] += 1
351
await self.emit_custom_event(
352
"request_error",
353
f"Failed to process {url}: {e}",
354
{"url": url, "error": str(e)}
355
)
356
357
async def run(self, urls: list[str]):
358
"""Run crawler with event emission."""
359
await self.emit_custom_event(
360
"crawl_start",
361
f"Starting crawl with {len(urls)} URLs",
362
{"url_count": len(urls)}
363
)
364
365
for url in urls:
366
await self.process_request(url)
367
368
await self.emit_custom_event(
369
"crawl_complete",
370
"Crawl completed",
371
{
372
"total_urls": len(urls),
373
"processed": self.stats['requests_processed'],
374
"failed": self.stats['requests_failed']
375
}
376
)
377
378
async def main():
379
crawler = CrawlerWithCustomEvents()
380
381
# Register event listeners
382
@crawler.event_manager.on("crawl_start")
383
async def crawl_start_handler(data: CustomEventData):
384
print(f"π {data.message}")
385
print(f" URLs to process: {data.data['url_count']}")
386
387
@crawler.event_manager.on("request_start")
388
async def request_start_handler(data: CustomEventData):
389
print(f"π₯ {data.message}")
390
391
@crawler.event_manager.on("request_progress")
392
async def request_progress_handler(data: CustomEventData):
393
step = data.data.get('step', 'unknown')
394
url = data.data.get('url', 'unknown')
395
print(f"βοΈ {step.capitalize()}: {url}")
396
397
@crawler.event_manager.on("request_complete")
398
async def request_complete_handler(data: CustomEventData):
399
print(f"β {data.message}")
400
401
@crawler.event_manager.on("request_error")
402
async def request_error_handler(data: CustomEventData):
403
print(f"β {data.message}")
404
405
@crawler.event_manager.on("crawl_complete")
406
async def crawl_complete_handler(data: CustomEventData):
407
print(f"π {data.message}")
408
print(f" Processed: {data.data['processed']}")
409
print(f" Failed: {data.data['failed']}")
410
411
# Run crawler
412
urls = [
413
'https://example.com/page1',
414
'https://example.com/page2',
415
'https://example.com/page3'
416
]
417
418
await crawler.run(urls)
419
420
asyncio.run(main())
421
```
422
423
### Event-Based Monitoring
424
425
```python
426
import asyncio
427
import time
428
from datetime import datetime, timedelta
429
from crawlee.crawlers import HttpCrawler, HttpCrawlingContext
430
from crawlee.events import LocalEventManager, EventSystemInfoData
431
432
class CrawlerMonitor:
433
"""Monitor crawler performance using events."""
434
435
def __init__(self):
436
self.start_time = None
437
self.request_times = []
438
self.system_snapshots = []
439
self.error_count = 0
440
self.success_count = 0
441
442
def setup_monitoring(self, event_manager: LocalEventManager):
443
"""Setup event listeners for monitoring."""
444
445
@event_manager.on("system_info")
446
async def monitor_system(data: EventSystemInfoData):
447
self.system_snapshots.append({
448
'timestamp': data.created_at,
449
'cpu_percent': data.cpu_usage_percent,
450
'memory_mb': data.memory_usage_mb,
451
'event_loop_delay': data.event_loop_delay_ms
452
})
453
454
@event_manager.on("persist_state")
455
async def monitor_persistence(data):
456
print(f"πΎ State persistence at {datetime.now()}")
457
458
@event_manager.on("aborting")
459
async def monitor_abort(data):
460
print(f"π Crawler aborting: {data.reason if data else 'Unknown'}")
461
await self.generate_report()
462
463
async def start_monitoring(self):
464
"""Start monitoring session."""
465
self.start_time = datetime.now()
466
print(f"π Monitoring started at {self.start_time}")
467
468
async def record_request_start(self, url: str):
469
"""Record request start time."""
470
return time.time()
471
472
async def record_request_end(self, start_time: float, success: bool):
473
"""Record request completion."""
474
duration = (time.time() - start_time) * 1000 # Convert to ms
475
self.request_times.append(duration)
476
477
if success:
478
self.success_count += 1
479
else:
480
self.error_count += 1
481
482
async def generate_report(self):
483
"""Generate monitoring report."""
484
if not self.start_time:
485
return
486
487
duration = datetime.now() - self.start_time
488
489
print(f"\nπ Crawler Monitoring Report")
490
print(f"=" * 40)
491
print(f"Total Duration: {duration}")
492
print(f"Requests: {self.success_count + self.error_count}")
493
print(f" Success: {self.success_count}")
494
print(f" Errors: {self.error_count}")
495
496
if self.request_times:
497
avg_time = sum(self.request_times) / len(self.request_times)
498
min_time = min(self.request_times)
499
max_time = max(self.request_times)
500
501
print(f"Request Times:")
502
print(f" Average: {avg_time:.2f}ms")
503
print(f" Min: {min_time:.2f}ms")
504
print(f" Max: {max_time:.2f}ms")
505
506
if self.system_snapshots:
507
avg_cpu = sum(s['cpu_percent'] for s in self.system_snapshots) / len(self.system_snapshots)
508
avg_memory = sum(s['memory_mb'] for s in self.system_snapshots) / len(self.system_snapshots)
509
510
print(f"System Resources:")
511
print(f" Average CPU: {avg_cpu:.1f}%")
512
print(f" Average Memory: {avg_memory:.1f}MB")
513
print(f" Snapshots: {len(self.system_snapshots)}")
514
515
async def main():
516
monitor = CrawlerMonitor()
517
event_manager = LocalEventManager()
518
519
# Setup monitoring
520
monitor.setup_monitoring(event_manager)
521
522
crawler = HttpCrawler(
523
event_manager=event_manager,
524
max_requests_per_crawl=20
525
)
526
527
@crawler.router.default_handler
528
async def handler(context: HttpCrawlingContext):
529
start_time = await monitor.record_request_start(context.request.url)
530
531
try:
532
# Simulate processing
533
await asyncio.sleep(0.5)
534
535
data = {
536
'url': context.request.url,
537
'status': context.response.status_code,
538
'timestamp': datetime.now().isoformat()
539
}
540
541
await context.push_data(data)
542
await monitor.record_request_end(start_time, success=True)
543
544
except Exception:
545
await monitor.record_request_end(start_time, success=False)
546
raise
547
548
await monitor.start_monitoring()
549
550
# Start crawling
551
urls = [f'https://httpbin.org/delay/{i%3+1}' for i in range(15)]
552
await crawler.run(urls)
553
554
# Generate final report
555
await monitor.generate_report()
556
557
asyncio.run(main())
558
```
559
560
### Event-Based Workflow Control
561
562
```python
563
import asyncio
564
from crawlee.events import LocalEventManager, EventData
565
566
class WorkflowEventData(EventData):
567
"""Event data for workflow control."""
568
569
def __init__(self, step: str, data: dict = None):
570
self.step = step
571
self.data = data or {}
572
self.timestamp = datetime.now()
573
574
class WorkflowController:
575
"""Control crawler workflow using events."""
576
577
def __init__(self):
578
self.event_manager = LocalEventManager()
579
self.current_step = "idle"
580
self.workflow_data = {}
581
self.should_pause = False
582
self.should_stop = False
583
584
async def emit_workflow_event(self, step: str, data: dict = None):
585
"""Emit workflow event."""
586
event_data = WorkflowEventData(step, data)
587
await self.event_manager.emit("workflow", event_data)
588
589
def setup_workflow_control(self):
590
"""Setup workflow event handlers."""
591
592
@self.event_manager.on("workflow")
593
async def workflow_handler(data: WorkflowEventData):
594
self.current_step = data.step
595
596
if data.step == "pause_requested":
597
self.should_pause = True
598
print("βΈοΈ Workflow pause requested")
599
600
elif data.step == "resume_requested":
601
self.should_pause = False
602
print("βΆοΈ Workflow resume requested")
603
604
elif data.step == "stop_requested":
605
self.should_stop = True
606
print("π Workflow stop requested")
607
608
elif data.step == "step_complete":
609
step_name = data.data.get('name', 'unknown')
610
print(f"β Step completed: {step_name}")
611
612
elif data.step == "error_occurred":
613
error_msg = data.data.get('error', 'Unknown error')
614
print(f"β Workflow error: {error_msg}")
615
616
async def wait_for_resume(self):
617
"""Wait while paused."""
618
while self.should_pause and not self.should_stop:
619
print("βΈοΈ Workflow paused, waiting for resume...")
620
await asyncio.sleep(1)
621
622
async def check_workflow_control(self):
623
"""Check if workflow should continue."""
624
if self.should_stop:
625
await self.emit_workflow_event("workflow_stopped")
626
return False
627
628
if self.should_pause:
629
await self.wait_for_resume()
630
631
return True
632
633
async def execute_step(self, step_name: str, step_func, *args, **kwargs):
634
"""Execute workflow step with control checks."""
635
if not await self.check_workflow_control():
636
return False
637
638
try:
639
await self.emit_workflow_event("step_start", {"name": step_name})
640
641
result = await step_func(*args, **kwargs)
642
643
await self.emit_workflow_event("step_complete", {
644
"name": step_name,
645
"result": result
646
})
647
648
return True
649
650
except Exception as e:
651
await self.emit_workflow_event("error_occurred", {
652
"step": step_name,
653
"error": str(e)
654
})
655
return False
656
657
async def run_workflow(self, steps: list):
658
"""Run workflow with event-based control."""
659
await self.emit_workflow_event("workflow_start", {"steps": len(steps)})
660
661
for i, (step_name, step_func, args, kwargs) in enumerate(steps):
662
success = await self.execute_step(step_name, step_func, *args, **kwargs)
663
664
if not success:
665
await self.emit_workflow_event("workflow_failed", {"failed_step": step_name})
666
return False
667
668
await self.emit_workflow_event("workflow_complete")
669
return True
670
671
async def main():
672
controller = WorkflowController()
673
controller.setup_workflow_control()
674
675
# Example workflow steps
676
async def fetch_data(url: str):
677
print(f"π₯ Fetching data from {url}")
678
await asyncio.sleep(1)
679
return f"data_from_{url.split('//')[-1]}"
680
681
async def process_data(data: str):
682
print(f"βοΈ Processing {data}")
683
await asyncio.sleep(0.5)
684
return f"processed_{data}"
685
686
async def save_results(data: str):
687
print(f"πΎ Saving {data}")
688
await asyncio.sleep(0.3)
689
return "saved"
690
691
# Define workflow
692
workflow_steps = [
693
("fetch_data", fetch_data, ("https://example.com/api",), {}),
694
("process_data", process_data, ("raw_data",), {}),
695
("save_results", save_results, ("processed_data",), {})
696
]
697
698
# Start workflow
699
workflow_task = asyncio.create_task(
700
controller.run_workflow(workflow_steps)
701
)
702
703
# Simulate user control after 2 seconds
704
await asyncio.sleep(2)
705
await controller.emit_workflow_event("pause_requested")
706
707
# Resume after 3 seconds
708
await asyncio.sleep(3)
709
await controller.emit_workflow_event("resume_requested")
710
711
# Wait for workflow completion
712
success = await workflow_task
713
714
if success:
715
print("π Workflow completed successfully")
716
else:
717
print("π₯ Workflow failed")
718
719
from datetime import datetime
720
asyncio.run(main())
721
```