or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

browser-automation.mdcli-tools.mdconfiguration.mdcore-types.mdcrawlers.mderror-handling.mdevents.mdfingerprinting.mdhttp-clients.mdindex.mdrequest-management.mdsessions.mdstatistics.mdstorage.md

events.mddocs/

0

# Events

1

2

Event-driven architecture for hooking into crawler lifecycle events and implementing custom behaviors. The events system provides a way to react to various crawler states and implement cross-cutting concerns like logging, monitoring, and custom workflows.

3

4

## Capabilities

5

6

### Event Manager

7

8

Abstract base class for event management systems that handle event emission and listener registration.

9

10

```python { .api }

11

class EventManager:

12

async def emit(

13

self,

14

event_name: Event,

15

event_data: EventData | None = None

16

) -> None:

17

"""

18

Emit an event to all registered listeners.

19

20

Args:

21

event_name: Name/type of the event

22

event_data: Data associated with the event

23

"""

24

25

def on(

26

self,

27

event_name: Event,

28

listener: EventListener

29

) -> None:

30

"""

31

Register event listener for specific event.

32

33

Args:

34

event_name: Event to listen for

35

listener: Function to call when event occurs

36

"""

37

38

def off(

39

self,

40

event_name: Event,

41

listener: EventListener | None = None

42

) -> None:

43

"""

44

Remove event listener(s).

45

46

Args:

47

event_name: Event to stop listening for

48

listener: Specific listener to remove (None removes all)

49

"""

50

51

def once(

52

self,

53

event_name: Event,

54

listener: EventListener

55

) -> None:

56

"""

57

Register listener that only fires once.

58

59

Args:

60

event_name: Event to listen for once

61

listener: Function to call when event occurs

62

"""

63

```

64

65

### Local Event Manager

66

67

Local implementation of event manager for single-process event handling.

68

69

```python { .api }

70

class LocalEventManager(EventManager):

71

def __init__(self): ...

72

73

def get_listener_count(self, event_name: Event) -> int:

74

"""Get number of listeners for specific event."""

75

76

def get_event_names(self) -> list[Event]:

77

"""Get list of all events with registered listeners."""

78

79

def clear(self) -> None:

80

"""Remove all event listeners."""

81

```

82

83

## Event Types

84

85

### Core Events

86

87

Standard events emitted by crawlers during their lifecycle.

88

89

```python { .api }

90

Event = Literal[

91

"system_info", # System resource information

92

"persist_state", # State persistence request

93

"migrating", # Data migration event

94

"aborting", # Crawler abort/stop event

95

"exit" # Crawler exit event

96

]

97

```

98

99

### Event Data Types

100

101

Base event data containers for different types of events.

102

103

```python { .api }

104

class EventData:

105

"""Base class for all event data."""

106

pass

107

```

108

109

```python { .api }

110

class EventSystemInfoData(EventData):

111

def __init__(

112

self,

113

*,

114

cpu_usage_percent: float,

115

memory_usage_bytes: int,

116

event_loop_delay_ms: float | None = None,

117

created_at: datetime | None = None

118

): ...

119

120

@property

121

def cpu_usage_percent(self) -> float:

122

"""Current CPU usage percentage."""

123

124

@property

125

def memory_usage_bytes(self) -> int:

126

"""Current memory usage in bytes."""

127

128

@property

129

def memory_usage_mb(self) -> float:

130

"""Current memory usage in megabytes."""

131

132

@property

133

def event_loop_delay_ms(self) -> float | None:

134

"""Event loop delay in milliseconds."""

135

136

@property

137

def created_at(self) -> datetime:

138

"""Timestamp when data was created."""

139

```

140

141

```python { .api }

142

class EventPersistStateData(EventData):

143

def __init__(

144

self,

145

*,

146

is_migrating: bool = False,

147

created_at: datetime | None = None

148

): ...

149

150

@property

151

def is_migrating(self) -> bool:

152

"""Whether persistence is due to migration."""

153

154

@property

155

def created_at(self) -> datetime:

156

"""Timestamp when persistence was requested."""

157

```

158

159

```python { .api }

160

class EventMigratingData(EventData):

161

def __init__(

162

self,

163

*,

164

reason: str | None = None,

165

created_at: datetime | None = None

166

): ...

167

168

@property

169

def reason(self) -> str | None:

170

"""Reason for migration."""

171

172

@property

173

def created_at(self) -> datetime:

174

"""Timestamp when migration started."""

175

```

176

177

```python { .api }

178

class EventAbortingData(EventData):

179

def __init__(

180

self,

181

*,

182

reason: str | None = None,

183

error: Exception | None = None,

184

created_at: datetime | None = None

185

): ...

186

187

@property

188

def reason(self) -> str | None:

189

"""Reason for aborting."""

190

191

@property

192

def error(self) -> Exception | None:

193

"""Exception that caused abort (if any)."""

194

195

@property

196

def created_at(self) -> datetime:

197

"""Timestamp when abort occurred."""

198

```

199

200

```python { .api }

201

class EventExitData(EventData):

202

def __init__(

203

self,

204

*,

205

exit_code: int = 0,

206

reason: str | None = None,

207

created_at: datetime | None = None

208

): ...

209

210

@property

211

def exit_code(self) -> int:

212

"""Exit code (0 for success)."""

213

214

@property

215

def reason(self) -> str | None:

216

"""Reason for exit."""

217

218

@property

219

def created_at(self) -> datetime:

220

"""Timestamp when exit occurred."""

221

```

222

223

### Event Listener Type

224

225

Type definition for event listener functions.

226

227

```python { .api }

228

EventListener = Callable[[EventData | None], Awaitable[None] | None]

229

```

230

231

## Usage Examples

232

233

### Basic Event Handling

234

235

```python

236

import asyncio

237

from crawlee.crawlers import HttpCrawler, HttpCrawlingContext

238

from crawlee.events import LocalEventManager, EventSystemInfoData

239

240

async def main():

241

# Create event manager

242

event_manager = LocalEventManager()

243

244

# Register event listeners

245

@event_manager.on("system_info")

246

async def system_info_handler(data: EventSystemInfoData):

247

print(f"System Info - CPU: {data.cpu_usage_percent:.1f}%, Memory: {data.memory_usage_mb:.1f}MB")

248

249

@event_manager.on("persist_state")

250

async def persist_state_handler(data):

251

print("State persistence requested")

252

if data and data.is_migrating:

253

print(" Reason: Migration")

254

255

@event_manager.on("aborting")

256

async def aborting_handler(data):

257

print(f"Crawler aborting: {data.reason if data else 'Unknown reason'}")

258

if data and data.error:

259

print(f" Error: {data.error}")

260

261

# Create crawler with event manager

262

crawler = HttpCrawler(

263

event_manager=event_manager,

264

max_requests_per_crawl=10

265

)

266

267

@crawler.router.default_handler

268

async def handler(context: HttpCrawlingContext):

269

data = {

270

'url': context.request.url,

271

'status': context.response.status_code

272

}

273

await context.push_data(data)

274

275

# Run crawler - events will be emitted during execution

276

urls = ['https://httpbin.org/delay/1'] * 5

277

await crawler.run(urls)

278

279

asyncio.run(main())

280

```

281

282

### Custom Event System

283

284

```python

285

import asyncio

286

from datetime import datetime

287

from crawlee.events import LocalEventManager, EventData

288

289

class CustomEventData(EventData):

290

"""Custom event data for application-specific events."""

291

292

def __init__(self, message: str, data: dict = None):

293

self.message = message

294

self.data = data or {}

295

self.timestamp = datetime.now()

296

297

class CrawlerWithCustomEvents:

298

"""Crawler wrapper that emits custom events."""

299

300

def __init__(self):

301

self.event_manager = LocalEventManager()

302

self.stats = {

303

'requests_processed': 0,

304

'requests_failed': 0

305

}

306

307

async def emit_custom_event(self, event_name: str, message: str, data: dict = None):

308

"""Emit custom event with data."""

309

event_data = CustomEventData(message, data)

310

await self.event_manager.emit(event_name, event_data)

311

312

async def process_request(self, url: str):

313

"""Simulate request processing with events."""

314

try:

315

# Emit start event

316

await self.emit_custom_event(

317

"request_start",

318

f"Starting request to {url}",

319

{"url": url}

320

)

321

322

# Simulate processing

323

await asyncio.sleep(0.5)

324

325

# Emit progress events

326

await self.emit_custom_event(

327

"request_progress",

328

f"Downloaded content from {url}",

329

{"url": url, "step": "download"}

330

)

331

332

await asyncio.sleep(0.3)

333

334

await self.emit_custom_event(

335

"request_progress",

336

f"Parsing content from {url}",

337

{"url": url, "step": "parse"}

338

)

339

340

# Success

341

self.stats['requests_processed'] += 1

342

await self.emit_custom_event(

343

"request_complete",

344

f"Successfully processed {url}",

345

{"url": url, "status": "success"}

346

)

347

348

except Exception as e:

349

# Failure

350

self.stats['requests_failed'] += 1

351

await self.emit_custom_event(

352

"request_error",

353

f"Failed to process {url}: {e}",

354

{"url": url, "error": str(e)}

355

)

356

357

async def run(self, urls: list[str]):

358

"""Run crawler with event emission."""

359

await self.emit_custom_event(

360

"crawl_start",

361

f"Starting crawl with {len(urls)} URLs",

362

{"url_count": len(urls)}

363

)

364

365

for url in urls:

366

await self.process_request(url)

367

368

await self.emit_custom_event(

369

"crawl_complete",

370

"Crawl completed",

371

{

372

"total_urls": len(urls),

373

"processed": self.stats['requests_processed'],

374

"failed": self.stats['requests_failed']

375

}

376

)

377

378

async def main():

379

crawler = CrawlerWithCustomEvents()

380

381

# Register event listeners

382

@crawler.event_manager.on("crawl_start")

383

async def crawl_start_handler(data: CustomEventData):

384

print(f"πŸš€ {data.message}")

385

print(f" URLs to process: {data.data['url_count']}")

386

387

@crawler.event_manager.on("request_start")

388

async def request_start_handler(data: CustomEventData):

389

print(f"πŸ“₯ {data.message}")

390

391

@crawler.event_manager.on("request_progress")

392

async def request_progress_handler(data: CustomEventData):

393

step = data.data.get('step', 'unknown')

394

url = data.data.get('url', 'unknown')

395

print(f"βš™οΈ {step.capitalize()}: {url}")

396

397

@crawler.event_manager.on("request_complete")

398

async def request_complete_handler(data: CustomEventData):

399

print(f"βœ… {data.message}")

400

401

@crawler.event_manager.on("request_error")

402

async def request_error_handler(data: CustomEventData):

403

print(f"❌ {data.message}")

404

405

@crawler.event_manager.on("crawl_complete")

406

async def crawl_complete_handler(data: CustomEventData):

407

print(f"πŸŽ‰ {data.message}")

408

print(f" Processed: {data.data['processed']}")

409

print(f" Failed: {data.data['failed']}")

410

411

# Run crawler

412

urls = [

413

'https://example.com/page1',

414

'https://example.com/page2',

415

'https://example.com/page3'

416

]

417

418

await crawler.run(urls)

419

420

asyncio.run(main())

421

```

422

423

### Event-Based Monitoring

424

425

```python

426

import asyncio

427

import time

428

from datetime import datetime, timedelta

429

from crawlee.crawlers import HttpCrawler, HttpCrawlingContext

430

from crawlee.events import LocalEventManager, EventSystemInfoData

431

432

class CrawlerMonitor:

433

"""Monitor crawler performance using events."""

434

435

def __init__(self):

436

self.start_time = None

437

self.request_times = []

438

self.system_snapshots = []

439

self.error_count = 0

440

self.success_count = 0

441

442

def setup_monitoring(self, event_manager: LocalEventManager):

443

"""Setup event listeners for monitoring."""

444

445

@event_manager.on("system_info")

446

async def monitor_system(data: EventSystemInfoData):

447

self.system_snapshots.append({

448

'timestamp': data.created_at,

449

'cpu_percent': data.cpu_usage_percent,

450

'memory_mb': data.memory_usage_mb,

451

'event_loop_delay': data.event_loop_delay_ms

452

})

453

454

@event_manager.on("persist_state")

455

async def monitor_persistence(data):

456

print(f"πŸ’Ύ State persistence at {datetime.now()}")

457

458

@event_manager.on("aborting")

459

async def monitor_abort(data):

460

print(f"πŸ›‘ Crawler aborting: {data.reason if data else 'Unknown'}")

461

await self.generate_report()

462

463

async def start_monitoring(self):

464

"""Start monitoring session."""

465

self.start_time = datetime.now()

466

print(f"πŸ“Š Monitoring started at {self.start_time}")

467

468

async def record_request_start(self, url: str):

469

"""Record request start time."""

470

return time.time()

471

472

async def record_request_end(self, start_time: float, success: bool):

473

"""Record request completion."""

474

duration = (time.time() - start_time) * 1000 # Convert to ms

475

self.request_times.append(duration)

476

477

if success:

478

self.success_count += 1

479

else:

480

self.error_count += 1

481

482

async def generate_report(self):

483

"""Generate monitoring report."""

484

if not self.start_time:

485

return

486

487

duration = datetime.now() - self.start_time

488

489

print(f"\nπŸ“Š Crawler Monitoring Report")

490

print(f"=" * 40)

491

print(f"Total Duration: {duration}")

492

print(f"Requests: {self.success_count + self.error_count}")

493

print(f" Success: {self.success_count}")

494

print(f" Errors: {self.error_count}")

495

496

if self.request_times:

497

avg_time = sum(self.request_times) / len(self.request_times)

498

min_time = min(self.request_times)

499

max_time = max(self.request_times)

500

501

print(f"Request Times:")

502

print(f" Average: {avg_time:.2f}ms")

503

print(f" Min: {min_time:.2f}ms")

504

print(f" Max: {max_time:.2f}ms")

505

506

if self.system_snapshots:

507

avg_cpu = sum(s['cpu_percent'] for s in self.system_snapshots) / len(self.system_snapshots)

508

avg_memory = sum(s['memory_mb'] for s in self.system_snapshots) / len(self.system_snapshots)

509

510

print(f"System Resources:")

511

print(f" Average CPU: {avg_cpu:.1f}%")

512

print(f" Average Memory: {avg_memory:.1f}MB")

513

print(f" Snapshots: {len(self.system_snapshots)}")

514

515

async def main():

516

monitor = CrawlerMonitor()

517

event_manager = LocalEventManager()

518

519

# Setup monitoring

520

monitor.setup_monitoring(event_manager)

521

522

crawler = HttpCrawler(

523

event_manager=event_manager,

524

max_requests_per_crawl=20

525

)

526

527

@crawler.router.default_handler

528

async def handler(context: HttpCrawlingContext):

529

start_time = await monitor.record_request_start(context.request.url)

530

531

try:

532

# Simulate processing

533

await asyncio.sleep(0.5)

534

535

data = {

536

'url': context.request.url,

537

'status': context.response.status_code,

538

'timestamp': datetime.now().isoformat()

539

}

540

541

await context.push_data(data)

542

await monitor.record_request_end(start_time, success=True)

543

544

except Exception:

545

await monitor.record_request_end(start_time, success=False)

546

raise

547

548

await monitor.start_monitoring()

549

550

# Start crawling

551

urls = [f'https://httpbin.org/delay/{i%3+1}' for i in range(15)]

552

await crawler.run(urls)

553

554

# Generate final report

555

await monitor.generate_report()

556

557

asyncio.run(main())

558

```

559

560

### Event-Based Workflow Control

561

562

```python

563

import asyncio

564

from crawlee.events import LocalEventManager, EventData

565

566

class WorkflowEventData(EventData):

567

"""Event data for workflow control."""

568

569

def __init__(self, step: str, data: dict = None):

570

self.step = step

571

self.data = data or {}

572

self.timestamp = datetime.now()

573

574

class WorkflowController:

575

"""Control crawler workflow using events."""

576

577

def __init__(self):

578

self.event_manager = LocalEventManager()

579

self.current_step = "idle"

580

self.workflow_data = {}

581

self.should_pause = False

582

self.should_stop = False

583

584

async def emit_workflow_event(self, step: str, data: dict = None):

585

"""Emit workflow event."""

586

event_data = WorkflowEventData(step, data)

587

await self.event_manager.emit("workflow", event_data)

588

589

def setup_workflow_control(self):

590

"""Setup workflow event handlers."""

591

592

@self.event_manager.on("workflow")

593

async def workflow_handler(data: WorkflowEventData):

594

self.current_step = data.step

595

596

if data.step == "pause_requested":

597

self.should_pause = True

598

print("⏸️ Workflow pause requested")

599

600

elif data.step == "resume_requested":

601

self.should_pause = False

602

print("▢️ Workflow resume requested")

603

604

elif data.step == "stop_requested":

605

self.should_stop = True

606

print("πŸ›‘ Workflow stop requested")

607

608

elif data.step == "step_complete":

609

step_name = data.data.get('name', 'unknown')

610

print(f"βœ… Step completed: {step_name}")

611

612

elif data.step == "error_occurred":

613

error_msg = data.data.get('error', 'Unknown error')

614

print(f"❌ Workflow error: {error_msg}")

615

616

async def wait_for_resume(self):

617

"""Wait while paused."""

618

while self.should_pause and not self.should_stop:

619

print("⏸️ Workflow paused, waiting for resume...")

620

await asyncio.sleep(1)

621

622

async def check_workflow_control(self):

623

"""Check if workflow should continue."""

624

if self.should_stop:

625

await self.emit_workflow_event("workflow_stopped")

626

return False

627

628

if self.should_pause:

629

await self.wait_for_resume()

630

631

return True

632

633

async def execute_step(self, step_name: str, step_func, *args, **kwargs):

634

"""Execute workflow step with control checks."""

635

if not await self.check_workflow_control():

636

return False

637

638

try:

639

await self.emit_workflow_event("step_start", {"name": step_name})

640

641

result = await step_func(*args, **kwargs)

642

643

await self.emit_workflow_event("step_complete", {

644

"name": step_name,

645

"result": result

646

})

647

648

return True

649

650

except Exception as e:

651

await self.emit_workflow_event("error_occurred", {

652

"step": step_name,

653

"error": str(e)

654

})

655

return False

656

657

async def run_workflow(self, steps: list):

658

"""Run workflow with event-based control."""

659

await self.emit_workflow_event("workflow_start", {"steps": len(steps)})

660

661

for i, (step_name, step_func, args, kwargs) in enumerate(steps):

662

success = await self.execute_step(step_name, step_func, *args, **kwargs)

663

664

if not success:

665

await self.emit_workflow_event("workflow_failed", {"failed_step": step_name})

666

return False

667

668

await self.emit_workflow_event("workflow_complete")

669

return True

670

671

async def main():

672

controller = WorkflowController()

673

controller.setup_workflow_control()

674

675

# Example workflow steps

676

async def fetch_data(url: str):

677

print(f"πŸ“₯ Fetching data from {url}")

678

await asyncio.sleep(1)

679

return f"data_from_{url.split('//')[-1]}"

680

681

async def process_data(data: str):

682

print(f"βš™οΈ Processing {data}")

683

await asyncio.sleep(0.5)

684

return f"processed_{data}"

685

686

async def save_results(data: str):

687

print(f"πŸ’Ύ Saving {data}")

688

await asyncio.sleep(0.3)

689

return "saved"

690

691

# Define workflow

692

workflow_steps = [

693

("fetch_data", fetch_data, ("https://example.com/api",), {}),

694

("process_data", process_data, ("raw_data",), {}),

695

("save_results", save_results, ("processed_data",), {})

696

]

697

698

# Start workflow

699

workflow_task = asyncio.create_task(

700

controller.run_workflow(workflow_steps)

701

)

702

703

# Simulate user control after 2 seconds

704

await asyncio.sleep(2)

705

await controller.emit_workflow_event("pause_requested")

706

707

# Resume after 3 seconds

708

await asyncio.sleep(3)

709

await controller.emit_workflow_event("resume_requested")

710

711

# Wait for workflow completion

712

success = await workflow_task

713

714

if success:

715

print("πŸŽ‰ Workflow completed successfully")

716

else:

717

print("πŸ’₯ Workflow failed")

718

719

from datetime import datetime

720

asyncio.run(main())

721

```